Skip to content

Commit df1744c

Browse files
authored
[feat aga] Implement endpoint management for endpoint groups in accelerator (#4471)
* [feat aga] Implement endpoints builder filtering for partially loaded endpoints * [feat aga] Implement endpoints deployer with requeue logic for partially loaded endpoints
1 parent fe9140f commit df1744c

11 files changed

+1723
-37
lines changed

controllers/aga/globalaccelerator_controller.go

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,15 @@ const (
6464
agaResourcesGroupVersion = "aga.k8s.aws/v1beta1"
6565
globalAcceleratorKind = "GlobalAccelerator"
6666

67-
// Requeue constants for provisioning state monitoring
68-
requeueMessage = "Monitoring provisioning state"
69-
statusUpdateRequeueTime = 1 * time.Minute
67+
// Requeue constants for state monitoring
68+
// requeueReasonAcceleratorInProgress indicates that the reconciliation is being requeued because
69+
// the Global Accelerator is still in progress state
70+
requeueReasonAcceleratorInProgress = "Waiting for Global Accelerator %s with status 'IN_PROGRESS' to complete"
71+
72+
// requeueReasonEndpointsInWarningState indicates that the reconciliation is being requeued because
73+
// there are endpoints in warning state that need to be periodically rechecked
74+
requeueReasonEndpointsInWarningState = "Retrying endpoints for Global Accelerator %s which did load successfully - will check availability again soon"
75+
statusUpdateRequeueTime = 1 * time.Minute
7076

7177
// Metric stage constants
7278
MetricStageFetchGlobalAccelerator = "fetch_globalAccelerator"
@@ -251,8 +257,8 @@ func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Conte
251257
return nil
252258
}
253259

254-
func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi.GlobalAccelerator) (core.Stack, *agamodel.Accelerator, error) {
255-
stack, accelerator, err := r.modelBuilder.Build(ctx, ga)
260+
func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi.GlobalAccelerator, loadedEndpoints []*aga.LoadedEndpoint) (core.Stack, *agamodel.Accelerator, error) {
261+
stack, accelerator, err := r.modelBuilder.Build(ctx, ga, loadedEndpoints)
256262
if err != nil {
257263
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedBuildModel, fmt.Sprintf("Failed build model due to %v", err))
258264
return nil, nil, err
@@ -279,7 +285,7 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
279285
r.endpointResourcesManager.MonitorEndpointResources(ga, endpoints)
280286

281287
// Validate and load endpoint status using the endpoint loader
282-
_, fatalErrors := r.endpointLoader.LoadEndpoints(ctx, ga, endpoints)
288+
loadedEndpoints, fatalErrors := r.endpointLoader.LoadEndpoints(ctx, ga, endpoints)
283289
if len(fatalErrors) > 0 {
284290
err := fmt.Errorf("failed to load endpoints: %v", fatalErrors[0])
285291
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedEndpointLoad, fmt.Sprintf("Failed to reconcile due to %v", err))
@@ -295,7 +301,7 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
295301
var accelerator *agamodel.Accelerator
296302
var err error
297303
buildModelFn := func() {
298-
stack, accelerator, err = r.buildModel(ctx, ga)
304+
stack, accelerator, err = r.buildModel(ctx, ga, loadedEndpoints)
299305
}
300306
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageBuildModel, buildModelFn)
301307
if err != nil {
@@ -326,14 +332,37 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
326332

327333
r.logger.Info("Successfully deployed GlobalAccelerator stack", "stackID", stack.StackID())
328334

329-
// Update GlobalAccelerator status after successful deployment
335+
// Check if any endpoints have warning status and collect them
336+
hasWarningEndpoints := false
337+
for _, ep := range loadedEndpoints {
338+
if ep.Status == aga.EndpointStatusWarning {
339+
hasWarningEndpoints = true
340+
}
341+
}
342+
343+
// Update GlobalAccelerator status after successful deployment, including warning endpoints
330344
requeueNeeded, err := r.statusUpdater.UpdateStatusSuccess(ctx, ga, accelerator)
331345
if err != nil {
332346
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err))
333347
return err
334348
}
335-
if requeueNeeded {
336-
return ctrlerrors.NewRequeueNeededAfter(requeueMessage, statusUpdateRequeueTime)
349+
350+
// If we have warning endpoints, add a separate condition for them and requeue
351+
if hasWarningEndpoints {
352+
r.logger.V(1).Info("Detected endpoints in warning state, will requeue",
353+
"Global Accelerator", k8s.NamespacedName(ga))
354+
355+
// Add event to notify about warning endpoints
356+
warningMessage := fmt.Sprintf("Detected endpoints which did not load successfully. These endpoints will be rechecked shortly.")
357+
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonWarningEndpoints, warningMessage)
358+
}
359+
360+
if requeueNeeded || hasWarningEndpoints {
361+
message := fmt.Sprintf(requeueReasonAcceleratorInProgress, k8s.NamespacedName(ga))
362+
if hasWarningEndpoints {
363+
message = fmt.Sprintf(requeueReasonEndpointsInWarningState, k8s.NamespacedName(ga))
364+
}
365+
return ctrlerrors.NewRequeueNeededAfter(message, statusUpdateRequeueTime)
337366
}
338367

339368
r.eventRecorder.Event(ga, corev1.EventTypeNormal, k8s.GlobalAcceleratorEventReasonSuccessfullyReconciled, "Successfully reconciled")
@@ -379,7 +408,7 @@ func (r *globalAcceleratorReconciler) cleanupGlobalAcceleratorResources(ctx cont
379408
if updateErr := r.statusUpdater.UpdateStatusDeletion(ctx, ga); updateErr != nil {
380409
r.logger.Error(updateErr, "Failed to update status during accelerator deletion")
381410
}
382-
return ctrlerrors.NewRequeueNeeded("Waiting for accelerator to be disabled")
411+
return ctrlerrors.NewRequeueNeeded(fmt.Sprintf(requeueReasonAcceleratorInProgress, k8s.NamespacedName(ga)))
383412
}
384413

385414
// Any other error

pkg/aga/model_build_endpoint_group.go

Lines changed: 101 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
awssdk "github.com/aws/aws-sdk-go-v2/aws"
7+
"github.com/go-logr/logr"
78
agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
89
agamodel "sigs.k8s.io/aws-load-balancer-controller/pkg/model/aga"
910
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
@@ -12,27 +13,35 @@ import (
1213
// endpointGroupBuilder builds EndpointGroup model resources
1314
type endpointGroupBuilder interface {
1415
// Build builds all endpoint groups for all listeners
15-
Build(ctx context.Context, stack core.Stack, listeners []*agamodel.Listener, listenerConfigs []agaapi.GlobalAcceleratorListener) ([]*agamodel.EndpointGroup, error)
16+
Build(ctx context.Context, stack core.Stack, listeners []*agamodel.Listener,
17+
listenerConfigs []agaapi.GlobalAcceleratorListener, loadedEndpoints []*LoadedEndpoint) ([]*agamodel.EndpointGroup, error)
1618

1719
// buildEndpointGroupsForListener builds endpoint groups for a specific listener
18-
buildEndpointGroupsForListener(ctx context.Context, stack core.Stack, listener *agamodel.Listener, endpointGroups []agaapi.GlobalAcceleratorEndpointGroup, listenerIndex int) ([]*agamodel.EndpointGroup, error)
20+
buildEndpointGroupsForListener(ctx context.Context, stack core.Stack, listener *agamodel.Listener,
21+
endpointGroups []agaapi.GlobalAcceleratorEndpointGroup, listenerIndex int,
22+
loadedEndpoints []*LoadedEndpoint) ([]*agamodel.EndpointGroup, error)
1923
}
2024

2125
// NewEndpointGroupBuilder constructs new endpointGroupBuilder
22-
func NewEndpointGroupBuilder(clusterRegion string) endpointGroupBuilder {
26+
func NewEndpointGroupBuilder(clusterRegion string, gaNamespace string, logger logr.Logger) endpointGroupBuilder {
2327
return &defaultEndpointGroupBuilder{
2428
clusterRegion: clusterRegion,
29+
gaNamespace: gaNamespace,
30+
logger: logger,
2531
}
2632
}
2733

2834
var _ endpointGroupBuilder = &defaultEndpointGroupBuilder{}
2935

3036
type defaultEndpointGroupBuilder struct {
3137
clusterRegion string
38+
gaNamespace string
39+
logger logr.Logger
3240
}
3341

3442
// Build builds EndpointGroup model resources
35-
func (b *defaultEndpointGroupBuilder) Build(ctx context.Context, stack core.Stack, listeners []*agamodel.Listener, listenerConfigs []agaapi.GlobalAcceleratorListener) ([]*agamodel.EndpointGroup, error) {
43+
func (b *defaultEndpointGroupBuilder) Build(ctx context.Context, stack core.Stack, listeners []*agamodel.Listener,
44+
listenerConfigs []agaapi.GlobalAcceleratorListener, loadedEndpoints []*LoadedEndpoint) ([]*agamodel.EndpointGroup, error) {
3645
if listeners == nil || len(listeners) == 0 {
3746
return nil, nil
3847
}
@@ -51,7 +60,7 @@ func (b *defaultEndpointGroupBuilder) Build(ctx context.Context, stack core.Stac
5160
continue
5261
}
5362

54-
listenerEndpointGroups, err := b.buildEndpointGroupsForListener(ctx, stack, listener, *listenerConfig.EndpointGroups, i)
63+
listenerEndpointGroups, err := b.buildEndpointGroupsForListener(ctx, stack, listener, *listenerConfig.EndpointGroups, i, loadedEndpoints)
5564
if err != nil {
5665
return nil, err
5766
}
@@ -114,11 +123,13 @@ func (b *defaultEndpointGroupBuilder) validateEndpointPortOverridesWithinListene
114123
}
115124

116125
// buildEndpointGroupsForListener builds EndpointGroup models for a specific listener
117-
func (b *defaultEndpointGroupBuilder) buildEndpointGroupsForListener(ctx context.Context, stack core.Stack, listener *agamodel.Listener, endpointGroups []agaapi.GlobalAcceleratorEndpointGroup, listenerIndex int) ([]*agamodel.EndpointGroup, error) {
126+
func (b *defaultEndpointGroupBuilder) buildEndpointGroupsForListener(ctx context.Context, stack core.Stack,
127+
listener *agamodel.Listener, endpointGroups []agaapi.GlobalAcceleratorEndpointGroup,
128+
listenerIndex int, loadedEndpoints []*LoadedEndpoint) ([]*agamodel.EndpointGroup, error) {
118129
var result []*agamodel.EndpointGroup
119130

120131
for i, endpointGroup := range endpointGroups {
121-
spec, err := b.buildEndpointGroupSpec(ctx, listener, endpointGroup)
132+
spec, err := b.buildEndpointGroupSpec(ctx, listener, endpointGroup, loadedEndpoints)
122133
if err != nil {
123134
return nil, err
124135
}
@@ -132,7 +143,9 @@ func (b *defaultEndpointGroupBuilder) buildEndpointGroupsForListener(ctx context
132143
}
133144

134145
// buildEndpointGroupSpec builds the EndpointGroupSpec for a single EndpointGroup model resource
135-
func (b *defaultEndpointGroupBuilder) buildEndpointGroupSpec(ctx context.Context, listener *agamodel.Listener, endpointGroup agaapi.GlobalAcceleratorEndpointGroup) (agamodel.EndpointGroupSpec, error) {
146+
func (b *defaultEndpointGroupBuilder) buildEndpointGroupSpec(ctx context.Context,
147+
listener *agamodel.Listener, endpointGroup agaapi.GlobalAcceleratorEndpointGroup,
148+
loadedEndpoints []*LoadedEndpoint) (agamodel.EndpointGroupSpec, error) {
136149
region, err := b.determineRegion(endpointGroup)
137150
if err != nil {
138151
return agamodel.EndpointGroupSpec{}, err
@@ -146,14 +159,90 @@ func (b *defaultEndpointGroupBuilder) buildEndpointGroupSpec(ctx context.Context
146159
return agamodel.EndpointGroupSpec{}, err
147160
}
148161

162+
// Build endpoint configurations from both static configurations and loaded endpoints
163+
endpointConfigurations, err := b.buildEndpointConfigurations(ctx, endpointGroup, loadedEndpoints)
164+
if err != nil {
165+
return agamodel.EndpointGroupSpec{}, err
166+
}
167+
149168
return agamodel.EndpointGroupSpec{
150-
ListenerARN: listener.ListenerARN(),
151-
Region: region,
152-
TrafficDialPercentage: trafficDialPercentage,
153-
PortOverrides: portOverrides,
169+
ListenerARN: listener.ListenerARN(),
170+
Region: region,
171+
TrafficDialPercentage: trafficDialPercentage,
172+
PortOverrides: portOverrides,
173+
EndpointConfigurations: endpointConfigurations,
154174
}, nil
155175
}
156176

177+
// generateEndpointKey creates a consistent string key for endpoint lookup
178+
func generateEndpointKey(ep agaapi.GlobalAcceleratorEndpoint, gaNamespace string) string {
179+
namespace := gaNamespace
180+
if ep.Namespace != nil {
181+
namespace = awssdk.ToString(ep.Namespace)
182+
}
183+
name := awssdk.ToString(ep.Name)
184+
185+
if ep.Type == agaapi.GlobalAcceleratorEndpointTypeEndpointID {
186+
return fmt.Sprintf("%s/%s", ep.Type, awssdk.ToString(ep.EndpointID))
187+
}
188+
return fmt.Sprintf("%s/%s/%s", ep.Type, namespace, name)
189+
}
190+
191+
// buildEndpointConfigurations builds endpoint configurations from both static configurations in the API struct
192+
// and from successfully loaded endpoints
193+
func (b *defaultEndpointGroupBuilder) buildEndpointConfigurations(_ context.Context,
194+
endpointGroup agaapi.GlobalAcceleratorEndpointGroup, loadedEndpoints []*LoadedEndpoint) ([]agamodel.EndpointConfiguration, error) {
195+
196+
var endpointConfigurations []agamodel.EndpointConfiguration
197+
198+
// Skip if no endpoints defined in the endpoint group
199+
if endpointGroup.Endpoints == nil {
200+
return nil, nil
201+
}
202+
203+
// Build a map of loaded endpoints with for quick lookup
204+
loadedEndpointsMap := make(map[string]*LoadedEndpoint)
205+
for _, le := range loadedEndpoints {
206+
key := le.GetKey()
207+
loadedEndpointsMap[key] = le
208+
209+
}
210+
211+
// Process the endpoints defined in the CRD and match with loaded endpoints
212+
for _, ep := range *endpointGroup.Endpoints {
213+
// Create key for lookup using the helper function
214+
lookupKey := generateEndpointKey(ep, b.gaNamespace)
215+
216+
// Find the loaded endpoint
217+
if loadedEndpoint, found := loadedEndpointsMap[lookupKey]; found {
218+
// Add endpoint to model stack only if its in Loaded status and has valid ARN
219+
if loadedEndpoint.Status == EndpointStatusLoaded {
220+
// Create a base configuration with the loaded endpoint's ARN
221+
endpointConfig := agamodel.EndpointConfiguration{
222+
EndpointID: loadedEndpoint.ARN,
223+
}
224+
endpointConfig.Weight = awssdk.Int32(loadedEndpoint.Weight)
225+
endpointConfig.ClientIPPreservationEnabled = ep.ClientIPPreservationEnabled
226+
endpointConfigurations = append(endpointConfigurations, endpointConfig)
227+
} else {
228+
// Log warning for endpoints which are not loaded successfully during loading and has Warning status
229+
b.logger.Info("Endpoint not added to endpoint group as no valid ARN was found during loading",
230+
"endpoint", lookupKey,
231+
"message", loadedEndpoint.Message,
232+
"error", loadedEndpoint.Error)
233+
}
234+
} else {
235+
b.logger.Info("Endpoint not found in loaded endpoints",
236+
"endpoint", lookupKey)
237+
}
238+
}
239+
240+
return endpointConfigurations, nil
241+
}
242+
243+
// Note: The TargetsEndpointGroup method is no longer needed since we match endpoints based on
244+
// the explicit references in the GlobalAcceleratorEndpoint resources under each endpoint group
245+
157246
// validateListenerPortOverrideWithinListenerPortRanges ensures all listener ports used in port overrides are
158247
// contained within the listener's port ranges
159248
func (b *defaultEndpointGroupBuilder) validateListenerPortOverrideWithinListenerPortRanges(listener *agamodel.Listener, portOverrides []agamodel.PortOverride) error {

0 commit comments

Comments
 (0)