Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions controllers/aga/eventhandlers/resource_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package eventhandlers

import (
"context"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aga"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
)

// NewEnqueueRequestsForResourceEvent creates a new handler for generic resource events
func NewEnqueueRequestsForResourceEvent(
resourceType aga.ResourceType,
referenceTracker *aga.ReferenceTracker,
logger logr.Logger,
) handler.EventHandler {
return &enqueueRequestsForResourceEvent{
resourceType: resourceType,
referenceTracker: referenceTracker,
logger: logger,
}
}

// enqueueRequestsForResourceEvent handles resource events and enqueues reconcile requests for GlobalAccelerators
// that reference the resource
type enqueueRequestsForResourceEvent struct {
resourceType aga.ResourceType
referenceTracker *aga.ReferenceTracker
logger logr.Logger
}

// The following methods implement handler.TypedEventHandler interface

// Create handles Create events with the typed API
func (h *enqueueRequestsForResourceEvent) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.handleResource(ctx, evt.Object, "created", queue)
}

// Update handles Update events with the typed API
func (h *enqueueRequestsForResourceEvent) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.handleResource(ctx, evt.ObjectNew, "updated", queue)
}

// Delete handles Delete events with the typed API
func (h *enqueueRequestsForResourceEvent) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.handleResource(ctx, evt.Object, "deleted", queue)
}

// Generic handles Generic events with the typed API
func (h *enqueueRequestsForResourceEvent) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.handleResource(ctx, evt.Object, "generic event", queue)
}

// handleTypedResource handles resource events for the typed interface
func (h *enqueueRequestsForResourceEvent) handleResource(_ context.Context, obj interface{}, eventType string, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
var namespace, name string

// Extract namespace and name based on the object type
switch res := obj.(type) {
case *corev1.Service:
namespace = res.Namespace
name = res.Name
case *networking.Ingress:
namespace = res.Namespace
name = res.Name
case *gwv1.Gateway:
namespace = res.Namespace
name = res.Name
case *unstructured.Unstructured:
namespace = res.GetNamespace()
name = res.GetName()
default:
h.logger.Error(nil, "Unknown resource type", "type", h.resourceType)
return
}

resourceKey := aga.ResourceKey{
Type: h.resourceType,
Name: types.NamespacedName{
Namespace: namespace,
Name: name,
},
}

// If this resource is not referenced by any GA, no need to queue reconciles
if !h.referenceTracker.IsResourceReferenced(resourceKey) {
return
}

// Get all GAs that reference this resource
gaRefs := h.referenceTracker.GetGAsForResource(resourceKey)

// Queue reconcile for affected GAs
for _, gaRef := range gaRefs {
h.logger.V(1).Info("Enqueueing GA for reconcile due to resource event",
"resourceType", h.resourceType,
"resourceName", resourceKey.Name,
"eventType", eventType,
"ga", gaRef)

queue.Add(reconcile.Request{NamespacedName: gaRef})
}
}
156 changes: 148 additions & 8 deletions controllers/aga/globalaccelerator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
"sigs.k8s.io/aws-load-balancer-controller/controllers/aga/eventhandlers"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aga"
"sigs.k8s.io/aws-load-balancer-controller/pkg/config"
"sigs.k8s.io/aws-load-balancer-controller/pkg/deploy"
Expand All @@ -50,6 +53,7 @@ import (
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
agastatus "sigs.k8s.io/aws-load-balancer-controller/pkg/status/aga"
gwclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
)

const (
Expand Down Expand Up @@ -83,7 +87,7 @@ const (
func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager, config config.ControllerConfig, cloud services.Cloud, logger logr.Logger, metricsCollector lbcmetrics.MetricCollector, reconcileCounters *metricsutil.ReconcileCounters) *globalAcceleratorReconciler {

// Create tracking provider
trackingProvider := tracking.NewDefaultProvider(agaTagPrefix, config.ClusterName, tracking.WithRegion(config.AWSConfig.Region))
trackingProvider := tracking.NewDefaultProvider(agaTagPrefix, config.ClusterName, tracking.WithRegion(cloud.Region()))

// Create model builder
agaModelBuilder := aga.NewDefaultModelBuilder(
Expand All @@ -92,7 +96,7 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
trackingProvider,
config.FeatureGates,
config.ClusterName,
config.AWSConfig.Region,
cloud.Region(),
config.DefaultTags,
config.ExternalManagedTags,
logger.WithName("aga-model-builder"),
Expand All @@ -108,6 +112,18 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
// Create status updater
statusUpdater := agastatus.NewStatusUpdater(k8sClient, logger)

// Create reference tracker for endpoint tracking
referenceTracker := aga.NewReferenceTracker(logger.WithName("reference-tracker"))

// Create DNS resolver
dnsResolver, err := aga.NewDNSResolver(cloud.ELBV2())
if err != nil {
logger.Error(err, "Failed to create DNS resolver")
}

// Create unified endpoint loader
endpointLoader := aga.NewEndpointLoader(k8sClient, dnsResolver, logger.WithName("endpoint-loader"))

return &globalAcceleratorReconciler{
k8sClient: k8sClient,
eventRecorder: eventRecorder,
Expand All @@ -120,6 +136,13 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
metricsCollector: metricsCollector,
reconcileTracker: reconcileCounters.IncrementAGA,

// Components for endpoint reference tracking
referenceTracker: referenceTracker,
dnsResolver: dnsResolver,

// Unified endpoint loader
endpointLoader: endpointLoader,

maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
maxExponentialBackoffDelay: config.GlobalAcceleratorMaxExponentialBackoffDelay,
}
Expand All @@ -138,6 +161,21 @@ type globalAcceleratorReconciler struct {
metricsCollector lbcmetrics.MetricCollector
reconcileTracker func(namespaceName ktypes.NamespacedName)

// Components for endpoint reference tracking
referenceTracker *aga.ReferenceTracker
dnsResolver *aga.DNSResolver

// Unified endpoint loader
endpointLoader aga.EndpointLoader

// Resources manager for dedicated endpoint resource watchers
endpointResourcesManager aga.EndpointResourcesManager

// Event channels for dedicated watchers
serviceEventChan chan event.GenericEvent
ingressEventChan chan event.GenericEvent
gatewayEventChan chan event.GenericEvent

maxConcurrentReconciles int
maxExponentialBackoffDelay time.Duration
}
Expand Down Expand Up @@ -194,6 +232,13 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAccelerator(ctx context.Con

func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
if k8s.HasFinalizer(ga, shared_constants.GlobalAcceleratorFinalizer) {
// Clean up references in the reference tracker
gaKey := k8s.NamespacedName(ga)
r.referenceTracker.RemoveGA(gaKey)

// Clean up resource watches
r.endpointResourcesManager.RemoveGA(gaKey)

// TODO: Implement cleanup logic for AWS Global Accelerator resources (Only cleaning up accelerator for now)
if err := r.cleanupGlobalAcceleratorResources(ctx, ga); err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedCleanup, fmt.Sprintf("Failed cleanup due to %v", err))
Expand Down Expand Up @@ -224,6 +269,29 @@ func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi

func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
r.logger.Info("Reconciling GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))

// Get all endpoints from GA
endpoints := aga.GetAllEndpointsFromGA(ga)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider calling this GetDesiredEndpointsFromGA; so that it's obvious these are coming from the spec and not from the current state of the accelerator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes makes sense


// Track referenced endpoints
r.referenceTracker.UpdateReferencesForGA(ga, endpoints)

// Update resource watches with the endpointResourcesManager
r.endpointResourcesManager.MonitorEndpointResources(ga, endpoints)

// Validate and load endpoint status using the endpoint loader
_, fatalErrors := r.endpointLoader.LoadEndpoints(ctx, ga, endpoints)
if len(fatalErrors) > 0 {
err := fmt.Errorf("failed to load endpoints: %v", fatalErrors[0])
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedEndpointLoad, fmt.Sprintf("Failed to reconcile due to %v", err))
r.logger.Error(err, fmt.Sprintf("fatal error loading endpoints for %v", k8s.NamespacedName(ga)))
// Handle other endpoint loading errors
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.EndpointLoadFailed, err.Error()); statusErr != nil {
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after endpoint load failure")
}
return err
}

var stack core.Stack
var accelerator *agamodel.Accelerator
var err error
Expand All @@ -232,6 +300,8 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
}
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageBuildModel, buildModelFn)
if err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GatewayEventReasonFailedBuildModel, fmt.Sprintf("Failed to build model: %v", err))
r.logger.Error(err, fmt.Sprintf("Failed to build model for: %v", k8s.NamespacedName(ga)))
// Update status to indicate model building failure
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.ModelBuildFailed, fmt.Sprintf("Failed to build model: %v", err)); statusErr != nil {
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after model build failure")
Expand All @@ -246,7 +316,7 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageDeployStack, deployStackFn)
if err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedDeploy, fmt.Sprintf("Failed to deploy stack due to %v", err))

r.logger.Error(err, fmt.Sprintf("Failed to deploy stack for: %v", k8s.NamespacedName(ga)))
// Update status to indicate deployment failure
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.DeploymentFailed, fmt.Sprintf("Failed to deploy stack: %v", err)); statusErr != nil {
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after deployment failure")
Expand Down Expand Up @@ -335,21 +405,91 @@ func (r *globalAcceleratorReconciler) SetupWithManager(ctx context.Context, mgr
return nil
}

if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
// Create event channels for dedicated watchers
r.serviceEventChan = make(chan event.GenericEvent)
r.ingressEventChan = make(chan event.GenericEvent)
r.gatewayEventChan = make(chan event.GenericEvent)

// Initialize Gateway API client using the same config
gwClient, err := gwclientset.NewForConfig(mgr.GetConfig())
if err != nil {
r.logger.Error(err, "Failed to create Gateway API client")
return err
}

// TODO: Add event handlers for Services, Ingresses, and Gateways
// that are referenced by GlobalAccelerator endpoints
// Initialize the endpoint resources manager with clients
r.endpointResourcesManager = aga.NewEndpointResourcesManager(
clientSet,
gwClient,
r.serviceEventChan,
r.ingressEventChan,
r.gatewayEventChan,
r.logger.WithName("endpoint-resources-manager"),
)

if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
// Set up the controller builder
ctrl, err := ctrl.NewControllerManagedBy(mgr).
For(&agaapi.GlobalAccelerator{}).
Named(controllerName).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](5*time.Second, r.maxExponentialBackoffDelay),
}).
Complete(r)
Build(r)

if err != nil {
return err
}

// Setup watches for resource events
if err := r.setupGlobalAcceleratorWatches(ctrl); err != nil {
return err
}

return nil
}

// setupGlobalAcceleratorWatches sets up watches for resources that can trigger reconciliation of GlobalAccelerator objects
func (r *globalAcceleratorReconciler) setupGlobalAcceleratorWatches(c controller.Controller) error {
loggerPrefix := r.logger.WithName("eventHandlers")

// Create handlers for our dedicated watchers
serviceHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
aga.ServiceResourceType,
r.referenceTracker,
loggerPrefix.WithName("service-handler"),
)

ingressHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
aga.IngressResourceType,
r.referenceTracker,
loggerPrefix.WithName("ingress-handler"),
)

gatewayHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
aga.GatewayResourceType,
r.referenceTracker,
loggerPrefix.WithName("gateway-handler"),
)

// Add watches using the channel sources with event handlers
if err := c.Watch(source.Channel(r.serviceEventChan, serviceHandler)); err != nil {
return err
}

if err := c.Watch(source.Channel(r.ingressEventChan, ingressHandler)); err != nil {
return err
}

if err := c.Watch(source.Channel(r.gatewayEventChan, gatewayHandler)); err != nil {
return err
}

return nil
}

func (r *globalAcceleratorReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru v1.0.2
github.com/onsi/ginkgo/v2 v2.23.3
github.com/onsi/gomega v1.37.0
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -148,6 +149,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/spf13/cobra v1.9.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.34.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/arc/v2 v2.0.5 h1:l2zaLDubNhW4XO3LnliVj0GXO3+/CGNJAg1dcN2Fpfw=
github.com/hashicorp/golang-lru/arc/v2 v2.0.5/go.mod h1:ny6zBSQZi2JxIeYcv7kt2sH2PXJtirBN7RDhRpxPkxU=
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
Expand Down
Loading
Loading