@@ -41,6 +41,7 @@ import (
4141 "k8s.io/apimachinery/pkg/types"
4242 "sigs.k8s.io/cluster-api/util/patch"
4343 ctrl "sigs.k8s.io/controller-runtime"
44+ "sigs.k8s.io/controller-runtime/pkg/builder"
4445 "sigs.k8s.io/controller-runtime/pkg/client"
4546 "sigs.k8s.io/controller-runtime/pkg/controller"
4647 "sigs.k8s.io/controller-runtime/pkg/handler"
@@ -212,55 +213,61 @@ func (r *JsonnetSourceReconciler) SetupWithManager(mgr ctrl.Manager,
212213 WithOptions (controller.Options {
213214 MaxConcurrentReconciles : 5 ,
214215 }).
216+ Watches (& corev1.ConfigMap {},
217+ handler .EnqueueRequestsFromMapFunc (r .requeueJsonnetSourceForReference ),
218+ builder .WithPredicates (
219+ ConfigMapPredicates (mgr .GetLogger ().WithValues ("predicate" , "configmappredicate" )),
220+ ),
221+ ).
222+ Watches (& corev1.Secret {},
223+ handler .EnqueueRequestsFromMapFunc (r .requeueJsonnetSourceForReference ),
224+ builder .WithPredicates (
225+ SecretPredicates (mgr .GetLogger ().WithValues ("predicate" , "secretpredicate" )),
226+ ),
227+ ).
215228 Build (r )
216229 if err != nil {
217230 return nil , errors .Wrap (err , "error creating controller" )
218231 }
219232
220- // When ConfigMap changes, according to ConfigMapPredicates,
221- // one or more ClusterSummaries need to be reconciled.
222- err = c .Watch (source .Kind (mgr .GetCache (), & corev1.ConfigMap {}),
223- handler .EnqueueRequestsFromMapFunc (r .requeueJsonnetSourceForReference ),
224- ConfigMapPredicates (mgr .GetLogger ().WithValues ("predicate" , "configmappredicate" )),
225- )
226- if err != nil {
227- return nil , err
228- }
229-
230- // When Secret changes, according to SecretPredicates,
231- // one or more ClusterSummaries need to be reconciled.
232- err = c .Watch (source .Kind (mgr .GetCache (), & corev1.Secret {}),
233- handler .EnqueueRequestsFromMapFunc (r .requeueJsonnetSourceForReference ),
234- SecretPredicates (mgr .GetLogger ().WithValues ("predicate" , "secretpredicate" )),
235- )
236-
237233 return c , err
238234}
239235
240236func (r * JsonnetSourceReconciler ) WatchForFlux (mgr ctrl.Manager , c controller.Controller ) error {
241237 // When a Flux source (GitRepository/OCIRepository/Bucket) changes, one or more ClusterSummaries
242238 // need to be reconciled.
243239
244- err := c .Watch (source .Kind (mgr .GetCache (), & sourcev1.GitRepository {}),
245- handler .EnqueueRequestsFromMapFunc (r .requeueJsonnetSourceForFluxSources ),
246- FluxSourcePredicates (r .Scheme , mgr .GetLogger ().WithValues ("predicate" , "fluxsourcepredicate" )),
240+ sourceGitRepository := source .Kind [* sourcev1.GitRepository ](
241+ mgr .GetCache (),
242+ & sourcev1.GitRepository {},
243+ handler .TypedEnqueueRequestsFromMapFunc (r .requeueJsonnetSourceForFluxGitRepository ),
244+ FluxGitRepositoryPredicate {Logger : mgr .GetLogger ().WithValues ("predicate" , "fluxsourcepredicate" )},
247245 )
248- if err != nil {
246+ if err := c . Watch ( sourceGitRepository ); err != nil {
249247 return err
250248 }
251249
252- err = c .Watch (source .Kind (mgr .GetCache (), & sourcev1b2.OCIRepository {}),
253- handler .EnqueueRequestsFromMapFunc (r .requeueJsonnetSourceForFluxSources ),
254- FluxSourcePredicates (r .Scheme , mgr .GetLogger ().WithValues ("predicate" , "fluxsourcepredicate" )),
250+ sourceOCIRepository := source .Kind [* sourcev1b2.OCIRepository ](
251+ mgr .GetCache (),
252+ & sourcev1b2.OCIRepository {},
253+ handler .TypedEnqueueRequestsFromMapFunc (r .requeueJsonnetSourceForFluxOCIRepository ),
254+ FluxOCIRepositoryPredicate {Logger : mgr .GetLogger ().WithValues ("predicate" , "fluxsourcepredicate" )},
255255 )
256- if err != nil {
256+ if err := c . Watch ( sourceOCIRepository ); err != nil {
257257 return err
258258 }
259259
260- return c .Watch (source .Kind (mgr .GetCache (), & sourcev1b2.Bucket {}),
261- handler .EnqueueRequestsFromMapFunc (r .requeueJsonnetSourceForFluxSources ),
262- FluxSourcePredicates (r .Scheme , mgr .GetLogger ().WithValues ("predicate" , "fluxsourcepredicate" )),
260+ sourceBucket := source .Kind [* sourcev1b2.Bucket ](
261+ mgr .GetCache (),
262+ & sourcev1b2.Bucket {},
263+ handler .TypedEnqueueRequestsFromMapFunc (r .requeueJsonnetSourceForFluxBucket ),
264+ FluxBucketPredicate {Logger : mgr .GetLogger ().WithValues ("predicate" , "fluxsourcepredicate" )},
263265 )
266+ if err := c .Watch (sourceBucket ); err != nil {
267+ return err
268+ }
269+
270+ return nil
264271}
265272
266273func (r * JsonnetSourceReconciler ) getReferenceMapForEntry (entry * corev1.ObjectReference ) * libsveltosset.Set {
@@ -428,12 +435,11 @@ func prepareFileSystemWithFluxSource(ctx context.Context, c client.Client,
428435 return "" , err
429436 }
430437
431- artifactFetcher := fetch .NewArchiveFetcher (
432- 1 ,
433- tar .UnlimitedUntarSize ,
434- tar .UnlimitedUntarSize ,
435- os .Getenv ("SOURCE_CONTROLLER_LOCALHOST" ),
436- )
438+ artifactFetcher := fetch .New (
439+ fetch .WithRetries (1 ),
440+ fetch .WithMaxDownloadSize (tar .UnlimitedUntarSize ),
441+ fetch .WithUntar (tar .WithMaxUntarSize (tar .UnlimitedUntarSize )),
442+ fetch .WithHostnameOverwrite (os .Getenv ("SOURCE_CONTROLLER_LOCALHOST" )))
437443
438444 // Download artifact and extract files to the tmp dir.
439445 err = artifactFetcher .Fetch (fluxSource .GetArtifact ().URL , fluxSource .GetArtifact ().Digest , tmpDir )
0 commit comments