Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions controllers/aga/globalaccelerator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
config.ExternalManagedTags,
logger.WithName("aga-model-builder"),
metricsCollector,
cloud.ELBV2(),
)

// Create stack marshaller
Expand Down
84 changes: 65 additions & 19 deletions controllers/ingress/group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (r *groupReconciler) reconcile(ctx context.Context, req reconcile.Request)
return ctrlerrors.NewErrorWithMetrics(controllerName, "add_group_finalizer_error", err, r.metricsCollector)
}

_, lb, frontendNlb, err := r.buildAndDeployModel(ctx, ingGroup)
_, lb, frontendNlb, listenerPorts, err := r.buildAndDeployModel(ctx, ingGroup)
if err != nil {
return err
}
Expand All @@ -173,7 +173,7 @@ func (r *groupReconciler) reconcile(ctx context.Context, req reconcile.Request)
return
}
}
statusErr = r.updateIngressGroupStatus(ctx, ingGroup, lbDNS, frontendNlbDNS)
statusErr = r.updateIngressGroupStatus(ctx, ingGroup, lbDNS, frontendNlbDNS, listenerPorts)
if statusErr != nil {
r.recordIngressGroupEvent(ctx, ingGroup, corev1.EventTypeWarning, k8s.IngressEventReasonFailedUpdateStatus,
fmt.Sprintf("Failed update status due to %v", statusErr))
Expand All @@ -200,25 +200,26 @@ func (r *groupReconciler) reconcile(ctx context.Context, req reconcile.Request)
return nil
}

func (r *groupReconciler) buildAndDeployModel(ctx context.Context, ingGroup ingress.Group) (core.Stack, *elbv2model.LoadBalancer, *elbv2model.LoadBalancer, error) {
func (r *groupReconciler) buildAndDeployModel(ctx context.Context, ingGroup ingress.Group) (core.Stack, *elbv2model.LoadBalancer, *elbv2model.LoadBalancer, []int32, error) {
var stack core.Stack
var lb *elbv2model.LoadBalancer
var secrets []types.NamespacedName
var backendSGRequired bool
var err error
var frontendNlb *elbv2model.LoadBalancer
var listenerPorts []int32
buildModelFn := func() {
stack, lb, secrets, backendSGRequired, frontendNlb, err = r.modelBuilder.Build(ctx, ingGroup, r.metricsCollector)
stack, lb, secrets, backendSGRequired, frontendNlb, listenerPorts, err = r.modelBuilder.Build(ctx, ingGroup, r.metricsCollector)
}
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, "build_model", buildModelFn)
if err != nil {
r.recordIngressGroupEvent(ctx, ingGroup, corev1.EventTypeWarning, k8s.IngressEventReasonFailedBuildModel, fmt.Sprintf("Failed build model due to %v", err))
return nil, nil, nil, ctrlerrors.NewErrorWithMetrics(controllerName, "build_model_error", err, r.metricsCollector)
return nil, nil, nil, nil, ctrlerrors.NewErrorWithMetrics(controllerName, "build_model_error", err, r.metricsCollector)
}
stackJSON, err := r.stackMarshaller.Marshal(stack)
if err != nil {
r.recordIngressGroupEvent(ctx, ingGroup, corev1.EventTypeWarning, k8s.IngressEventReasonFailedBuildModel, fmt.Sprintf("Failed build model due to %v", err))
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
r.logger.Info("successfully built model", "model", stackJSON)

Expand All @@ -229,10 +230,10 @@ func (r *groupReconciler) buildAndDeployModel(ctx context.Context, ingGroup ingr
if err != nil {
var requeueNeededAfter *ctrlerrors.RequeueNeededAfter
if errors.As(err, &requeueNeededAfter) {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
r.recordIngressGroupEvent(ctx, ingGroup, corev1.EventTypeWarning, k8s.IngressEventReasonFailedDeployModel, fmt.Sprintf("Failed deploy model due to %v", err))
return nil, nil, nil, ctrlerrors.NewErrorWithMetrics(controllerName, "deploy_model_error", err, r.metricsCollector)
return nil, nil, nil, nil, ctrlerrors.NewErrorWithMetrics(controllerName, "deploy_model_error", err, r.metricsCollector)
}
r.logger.Info("successfully deployed model", "ingressGroup", ingGroup.ID)
r.secretsManager.MonitorSecrets(ingGroup.ID.String(), secrets)
Expand All @@ -242,9 +243,9 @@ func (r *groupReconciler) buildAndDeployModel(ctx context.Context, ingGroup ingr
inactiveResources = append(inactiveResources, k8s.ToSliceOfNamespacedNames(ingGroup.Members)...)
}
if err := r.backendSGProvider.Release(ctx, networkingpkg.ResourceTypeIngress, inactiveResources); err != nil {
return nil, nil, nil, ctrlerrors.NewErrorWithMetrics(controllerName, "release_auto_generated_backend_sg_error", err, r.metricsCollector)
return nil, nil, nil, nil, ctrlerrors.NewErrorWithMetrics(controllerName, "release_auto_generated_backend_sg_error", err, r.metricsCollector)
}
return stack, lb, frontendNlb, nil
return stack, lb, frontendNlb, listenerPorts, nil
}

func (r *groupReconciler) recordIngressGroupEvent(_ context.Context, ingGroup ingress.Group, eventType string, reason string, message string) {
Expand All @@ -253,25 +254,35 @@ func (r *groupReconciler) recordIngressGroupEvent(_ context.Context, ingGroup in
}
}

func (r *groupReconciler) updateIngressGroupStatus(ctx context.Context, ingGroup ingress.Group, lbDNS string, frontendNLBDNS string) error {
func (r *groupReconciler) updateIngressGroupStatus(ctx context.Context, ingGroup ingress.Group, lbDNS string, frontendNLBDNS string, listenerPorts []int32) error {
for _, member := range ingGroup.Members {
if err := r.updateIngressStatus(ctx, lbDNS, frontendNLBDNS, member.Ing); err != nil {
if err := r.updateIngressStatus(ctx, lbDNS, frontendNLBDNS, member.Ing, listenerPorts); err != nil {
return err
}
}
return nil
}

func (r *groupReconciler) updateIngressStatus(ctx context.Context, lbDNS string, frontendNlbDNS string, ing *networking.Ingress) error {
func (r *groupReconciler) updateIngressStatus(ctx context.Context, lbDNS string, frontendNlbDNS string, ing *networking.Ingress, ports []int32) error {
ingressPorts := make([]networking.IngressPortStatus, len(ports))
for i, port := range ports {
ingressPorts[i] = networking.IngressPortStatus{
Port: port,
}
}

ingOld := ing.DeepCopy()
if len(ing.Status.LoadBalancer.Ingress) != 1 ||
ing.Status.LoadBalancer.Ingress[0].IP != "" ||
ing.Status.LoadBalancer.Ingress[0].Hostname != lbDNS {
ing.Status.LoadBalancer.Ingress = []networking.IngressLoadBalancerIngress{
{
Hostname: lbDNS,
Ports: ingressPorts,
},
}
} else if len(ports) > 0 {
ing.Status.LoadBalancer.Ingress[0].Ports = ingressPorts
}

// Ensure frontendNLBDNS is appended if it is not already added
Expand Down Expand Up @@ -405,21 +416,56 @@ func isIngressStatusEqual(a, b []networking.IngressLoadBalancerIngress) bool {
return false
}

setA := make(map[string]struct{}, len(a))
setB := make(map[string]struct{}, len(b))
hostnameToPortsA := make(map[string]map[int32]struct{})
hostnameToPortsB := make(map[string]map[int32]struct{})

for _, ingress := range a {
setA[ingress.Hostname] = struct{}{}
if ingress.Hostname == "" {
continue
}

portSet := make(map[int32]struct{})
for _, portStatus := range ingress.Ports {
portSet[portStatus.Port] = struct{}{}
}
hostnameToPortsA[ingress.Hostname] = portSet
}

for _, ingress := range b {
setB[ingress.Hostname] = struct{}{}
if ingress.Hostname == "" {
continue
}

portSet := make(map[int32]struct{})
for _, portStatus := range ingress.Ports {
portSet[portStatus.Port] = struct{}{}
}
hostnameToPortsB[ingress.Hostname] = portSet
}

for key := range setA {
if _, exists := setB[key]; !exists {
// Check if the maps are equal (same hostnames with same ports)
if len(hostnameToPortsA) != len(hostnameToPortsB) {
return false
}

// Check if all hostnames in A exist in B with the same ports
for hostname, portsA := range hostnameToPortsA {
portsB, exists := hostnameToPortsB[hostname]
if !exists {
return false // Hostname in A doesn't exist in B
}

// Check if port sets are equal (same length and same values)
if len(portsA) != len(portsB) {
return false
}

// Check if all ports in A exist in B
for port := range portsA {
if _, exists := portsB[port]; !exists {
return false
}
}
}
return true
}
Expand Down
Loading
Loading