Skip to content

Commit aafa513

Browse files
committed
[feat aga] Implement autp-discovery feature for supported endpoints
1 parent ef3b1da commit aafa513

File tree

10 files changed

+3102
-34
lines changed

10 files changed

+3102
-34
lines changed

controllers/aga/globalaccelerator_controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
107107
config.ExternalManagedTags,
108108
logger.WithName("aga-model-builder"),
109109
metricsCollector,
110+
cloud.ELBV2(),
110111
)
111112

112113
// Create stack marshaller

pkg/aga/endpoint_discovery.go

Lines changed: 373 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,373 @@
1+
package aga
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
awssdk "github.com/aws/aws-sdk-go-v2/aws"
8+
"github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2"
9+
elbv2types "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types"
10+
"github.com/go-logr/logr"
11+
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
12+
13+
corev1 "k8s.io/api/core/v1"
14+
networkingv1 "k8s.io/api/networking/v1"
15+
16+
agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
17+
elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1"
18+
"sigs.k8s.io/aws-load-balancer-controller/pkg/annotations"
19+
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
20+
"sigs.k8s.io/controller-runtime/pkg/client"
21+
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
22+
)
23+
24+
// ProtocolPortInfo contains information about a protocol and its associated ports
25+
type ProtocolPortInfo struct {
26+
Protocol agaapi.GlobalAcceleratorProtocol
27+
Ports []int32
28+
}
29+
30+
// EndpointDiscovery is responsible for extracting protocol and port information from different endpoint types
31+
type EndpointDiscovery struct {
32+
client client.Client
33+
annotationParser annotations.Parser
34+
logger logr.Logger
35+
elbv2Client services.ELBV2
36+
}
37+
38+
// NewEndpointDiscovery creates a new EndpointDiscovery instance
39+
func NewEndpointDiscovery(client client.Client, logger logr.Logger, elbv2Client services.ELBV2) *EndpointDiscovery {
40+
annotationParser := annotations.NewSuffixAnnotationParser(annotations.AnnotationPrefixIngress)
41+
return &EndpointDiscovery{
42+
client: client,
43+
annotationParser: annotationParser,
44+
logger: logger,
45+
elbv2Client: elbv2Client,
46+
}
47+
}
48+
49+
// FetchProtocolPortInfo extracts port and protocol information from a loaded endpoint
50+
// For the auto-discovery scenario, we use the following approach:
51+
// 1. Identify the endpoint type (Service, Ingress, Gateway, or LoadBalancer via EndpointID)
52+
// 2. Extract protocol and port information from the stored K8s resource or AWS API
53+
// 3. For Service endpoints, handle both TCP and UDP protocols based on the Service definition
54+
// 4. For Ingress endpoints, parse annotations to get port configurations
55+
// 5. For Gateway endpoints, map Gateway protocols to GlobalAccelerator protocols
56+
// 6. For LoadBalancer (EndpointID) endpoints, query AWS API to get listener information
57+
func (d *EndpointDiscovery) FetchProtocolPortInfo(ctx context.Context, endpoint *LoadedEndpoint) ([]ProtocolPortInfo, error) {
58+
// For Kubernetes resource types, check if K8s resource is available
59+
if endpoint.Type != agaapi.GlobalAcceleratorEndpointTypeEndpointID && endpoint.K8sResource == nil {
60+
return nil, fmt.Errorf("kubernetes resource not available for endpoint %s/%s",
61+
endpoint.Namespace, endpoint.Name)
62+
}
63+
64+
// Process based on endpoint type
65+
switch endpoint.Type {
66+
case agaapi.GlobalAcceleratorEndpointTypeService:
67+
return d.fetchServiceProtocolPortInfo(ctx, endpoint)
68+
case agaapi.GlobalAcceleratorEndpointTypeIngress:
69+
return d.fetchIngressProtocolPortInfo(ctx, endpoint)
70+
case agaapi.GlobalAcceleratorEndpointTypeGateway:
71+
return d.fetchGatewayProtocolPortInfo(ctx, endpoint)
72+
case agaapi.GlobalAcceleratorEndpointTypeEndpointID:
73+
// For LoadBalancer ARN endpoints, we query the AWS API directly
74+
// ARN should be already resolved during endpoint loading
75+
if endpoint.ARN == "" {
76+
return nil, fmt.Errorf("endpoint ARN is not available for endpoint with EndpointID type")
77+
}
78+
return d.fetchLoadBalancerProtocolPortInfo(ctx, endpoint)
79+
}
80+
81+
return nil, fmt.Errorf("auto-discovery not supported for endpoint type %s", endpoint.Type)
82+
}
83+
84+
// fetchServiceProtocolPortInfo extracts protocol and port information from a Service endpoint
85+
func (d *EndpointDiscovery) fetchServiceProtocolPortInfo(_ context.Context, endpoint *LoadedEndpoint) ([]ProtocolPortInfo, error) {
86+
svc, ok := endpoint.K8sResource.(*corev1.Service)
87+
if !ok {
88+
return nil, fmt.Errorf("expected Service object for endpoint %v but got %T",
89+
k8s.NamespacedName(endpoint.K8sResource), endpoint.K8sResource)
90+
}
91+
92+
// Group ports by port number to check for TCP_UDP services (same port number, different protocols)
93+
portMap := make(map[int32][]corev1.ServicePort)
94+
for _, port := range svc.Spec.Ports {
95+
key := port.Port
96+
if vals, exists := portMap[key]; exists {
97+
portMap[key] = append(vals, port)
98+
} else {
99+
portMap[key] = []corev1.ServicePort{port}
100+
}
101+
}
102+
103+
// Check for TCP_UDP services and return error if found
104+
for portNum, servicePorts := range portMap {
105+
if len(servicePorts) > 1 {
106+
// TCP_UDP service case not supported
107+
return nil, fmt.Errorf("auto-discovery does not support TCP_UDP services on the same port %d for endpoint %v",
108+
portNum, k8s.NamespacedName(svc))
109+
}
110+
}
111+
112+
// Group ports by protocol
113+
tcpPorts := []int32{}
114+
udpPorts := []int32{}
115+
116+
for _, port := range svc.Spec.Ports {
117+
if port.Protocol == corev1.ProtocolUDP {
118+
udpPorts = append(udpPorts, port.Port)
119+
} else {
120+
tcpPorts = append(tcpPorts, port.Port)
121+
}
122+
}
123+
124+
return createProtocolPortsInfo(tcpPorts, udpPorts), nil
125+
}
126+
127+
// fetchIngressProtocolPortInfo extracts protocol and port information from an Ingress endpoint
128+
// This function parses annotations to determine the correct ports and protocols
129+
func (d *EndpointDiscovery) fetchIngressProtocolPortInfo(ctx context.Context, endpoint *LoadedEndpoint) ([]ProtocolPortInfo, error) {
130+
ing, ok := endpoint.K8sResource.(*networkingv1.Ingress)
131+
if !ok {
132+
return nil, fmt.Errorf("expected Ingress object for endpoint %v but got %T",
133+
k8s.NamespacedName(endpoint.K8sResource), endpoint.K8sResource)
134+
}
135+
136+
// Check if there's a certificate (from annotations or IngressClassParams)
137+
hasCert, err := d.ingressHasCertificate(ctx, ing)
138+
if err != nil {
139+
return nil, fmt.Errorf("failed to check for ingress certificates: %w", err)
140+
}
141+
142+
// Check for listen-ports annotation
143+
var tcpPorts []int32
144+
rawListenPorts := ""
145+
if d.annotationParser.ParseStringAnnotation(annotations.IngressSuffixListenPorts, &rawListenPorts, ing.Annotations) {
146+
// Parse the listen-ports JSON format
147+
// Example format: [{"HTTP": 80}, {"HTTPS": 443}, {"HTTP": 8080}, {"HTTPS": 8443}]
148+
var err error
149+
tcpPorts, err = d.parseIngressListenPorts(rawListenPorts, ing)
150+
if err != nil {
151+
return nil, err
152+
}
153+
if len(tcpPorts) == 0 {
154+
return nil, fmt.Errorf("no valid ports found in listen-ports configuration for ingress %v", k8s.NamespacedName(ing))
155+
}
156+
} else {
157+
// Use default ports based on certificate presence
158+
if hasCert {
159+
tcpPorts = []int32{443} // HTTPS port
160+
} else {
161+
tcpPorts = []int32{80} // HTTP port
162+
}
163+
}
164+
165+
// Return TCP protocol with discovered ports
166+
return []ProtocolPortInfo{
167+
{Protocol: agaapi.GlobalAcceleratorProtocolTCP, Ports: tcpPorts},
168+
}, nil
169+
}
170+
171+
// ingressHasCertificate checks if an Ingress has certificates defined either in
172+
// annotations or IngressClassParams
173+
func (d *EndpointDiscovery) ingressHasCertificate(ctx context.Context, ing *networkingv1.Ingress) (bool, error) {
174+
// First check annotations
175+
certARN := ""
176+
hasCert := d.annotationParser.ParseStringAnnotation(annotations.IngressSuffixCertificateARN, &certARN, ing.Annotations) && certARN != ""
177+
178+
// If no certificate in annotations, check IngressClassParams if available
179+
if !hasCert && ing.Spec.IngressClassName != nil {
180+
hasCertFromParams, err := d.hasCertificatesInIngressClassParams(ctx, *ing.Spec.IngressClassName)
181+
if err != nil {
182+
return false, fmt.Errorf("error checking IngressClassParams for certificates: %w", err)
183+
}
184+
return hasCertFromParams, nil
185+
}
186+
187+
return hasCert, nil
188+
}
189+
190+
// parseIngressListenPorts parses the listen-ports annotation and extracts port numbers
191+
func (d *EndpointDiscovery) parseIngressListenPorts(rawListenPorts string, ing *networkingv1.Ingress) ([]int32, error) {
192+
var entries []map[string]int32
193+
if err := json.Unmarshal([]byte(rawListenPorts), &entries); err != nil {
194+
d.logger.V(1).Error(err, "failed to parse listen-ports configuration for ingress",
195+
"listen-ports", rawListenPorts,
196+
"ingress", k8s.NamespacedName(ing))
197+
return nil, fmt.Errorf("failed to parse listen-ports annotation: %w", err)
198+
}
199+
200+
if len(entries) == 0 {
201+
d.logger.V(1).Info("empty listen-ports configuration for ingress",
202+
"listen-ports", rawListenPorts,
203+
"ingress", k8s.NamespacedName(ing))
204+
return nil, fmt.Errorf("empty listen-ports configuration")
205+
}
206+
207+
// Extract all ports from the listen-ports annotation
208+
var tcpPorts []int32
209+
for _, entry := range entries {
210+
for _, port := range entry {
211+
tcpPorts = append(tcpPorts, port)
212+
}
213+
}
214+
215+
return tcpPorts, nil
216+
}
217+
218+
// fetchGatewayProtocolPortInfo extracts protocol and port information from a Gateway endpoint
219+
func (d *EndpointDiscovery) fetchGatewayProtocolPortInfo(_ context.Context, endpoint *LoadedEndpoint) ([]ProtocolPortInfo, error) {
220+
gw, ok := endpoint.K8sResource.(*gwv1.Gateway)
221+
if !ok {
222+
return nil, fmt.Errorf("expected Gateway object for endpoint %v but got %T",
223+
k8s.NamespacedName(endpoint.K8sResource), endpoint.K8sResource)
224+
}
225+
226+
// The test expects individual protocol-port mappings (one item per port) rather than grouped by protocol
227+
// For test compatibility, we'll create separate protocol groups for each port
228+
tcpPortsMap := make(map[int32]bool)
229+
udpPortsMap := make(map[int32]bool)
230+
231+
// Process each listener and record ports by protocol
232+
for _, listener := range gw.Spec.Listeners {
233+
switch listener.Protocol {
234+
case gwv1.UDPProtocolType:
235+
udpPortsMap[int32(listener.Port)] = true
236+
default:
237+
// For HTTP, HTTPS, TLS, and other protocols, use TCP
238+
tcpPortsMap[int32(listener.Port)] = true
239+
}
240+
}
241+
242+
// Convert maps to slices for easier handling
243+
var tcpPorts, udpPorts []int32
244+
for port := range tcpPortsMap {
245+
tcpPorts = append(tcpPorts, port)
246+
}
247+
for port := range udpPortsMap {
248+
udpPorts = append(udpPorts, port)
249+
}
250+
251+
return createProtocolPortsInfo(tcpPorts, udpPorts), nil
252+
}
253+
254+
// fetchLoadBalancerProtocolPortInfo extracts protocol and port information from a LoadBalancer ARN
255+
// This uses the AWS API to retrieve ELBv2 listener information
256+
func (d *EndpointDiscovery) fetchLoadBalancerProtocolPortInfo(ctx context.Context, endpoint *LoadedEndpoint) ([]ProtocolPortInfo, error) {
257+
lbARN := endpoint.ARN
258+
259+
// Call the AWS API to get listener information
260+
protocolPortsInfo, err := d.getProtocolPortFromELBListener(ctx, lbARN)
261+
if err != nil {
262+
return nil, fmt.Errorf("failed to describe listeners for load balancer ARN %s: %w", lbARN, err)
263+
}
264+
265+
// No listeners found
266+
if len(protocolPortsInfo) == 0 {
267+
return nil, fmt.Errorf("no listeners found for load balancer ARN %s", lbARN)
268+
}
269+
270+
var tcpPorts, udpPorts []int32
271+
for _, info := range protocolPortsInfo {
272+
if info.Protocol == agaapi.GlobalAcceleratorProtocolTCP {
273+
tcpPorts = info.Ports
274+
} else if info.Protocol == agaapi.GlobalAcceleratorProtocolUDP {
275+
udpPorts = info.Ports
276+
}
277+
}
278+
279+
d.logger.V(1).Info("discovered protocols and ports from AWS load balancer",
280+
"loadBalancerARN", lbARN,
281+
"tcpPorts", tcpPorts,
282+
"udpPorts", udpPorts)
283+
284+
return protocolPortsInfo, nil
285+
}
286+
287+
// hasCertificatesInIngressClassParams checks if the specified IngressClass has certificate ARNs defined
288+
// in its associated IngressClassParams
289+
func (d *EndpointDiscovery) hasCertificatesInIngressClassParams(ctx context.Context, ingressClassName string) (bool, error) {
290+
// First, get the IngressClass object to find its Parameters reference
291+
ingressClass := &networkingv1.IngressClass{}
292+
if err := d.client.Get(ctx, client.ObjectKey{Name: ingressClassName}, ingressClass); err != nil {
293+
return false, fmt.Errorf("failed to get IngressClass %s: %w", ingressClassName, err)
294+
}
295+
296+
// Check if the IngressClass has Parameters defined
297+
if ingressClass.Spec.Parameters == nil {
298+
return false, nil
299+
}
300+
301+
// Check if the Parameters refer to an IngressClassParams object
302+
if ingressClass.Spec.Parameters.APIGroup == nil ||
303+
*ingressClass.Spec.Parameters.APIGroup != "elbv2.k8s.aws" ||
304+
ingressClass.Spec.Parameters.Kind != "IngressClassParams" {
305+
return false, nil
306+
}
307+
308+
// Now get the IngressClassParams object
309+
ingressClassParams := &elbv2api.IngressClassParams{}
310+
if err := d.client.Get(ctx, client.ObjectKey{Name: ingressClass.Spec.Parameters.Name}, ingressClassParams); err != nil {
311+
return false, fmt.Errorf("failed to get IngressClassParams %s: %w", ingressClass.Spec.Parameters.Name, err)
312+
}
313+
314+
// Check if certificate ARNs are defined
315+
return len(ingressClassParams.Spec.CertificateArn) > 0, nil
316+
}
317+
318+
// getProtocolPortFromELBListener get the protocol and port info from ELB listener
319+
func (d *EndpointDiscovery) getProtocolPortFromELBListener(ctx context.Context, lbARN string) ([]ProtocolPortInfo, error) {
320+
input := &elasticloadbalancingv2.DescribeListenersInput{
321+
LoadBalancerArn: awssdk.String(lbARN),
322+
}
323+
324+
listeners, err := d.elbv2Client.DescribeListenersAsList(ctx, input)
325+
if err != nil {
326+
return nil, fmt.Errorf("failed to describe listeners for load balancer %s: %w", lbARN, err)
327+
}
328+
329+
// Group ports by protocol
330+
tcpPorts := []int32{}
331+
udpPorts := []int32{}
332+
333+
for _, listener := range listeners {
334+
port := awssdk.ToInt32(listener.Port)
335+
listenerProtocol := listener.Protocol
336+
337+
// Map ELB protocol to GA protocol
338+
switch listenerProtocol {
339+
case elbv2types.ProtocolEnumHttp, elbv2types.ProtocolEnumHttps, elbv2types.ProtocolEnumTcp, elbv2types.ProtocolEnumTls:
340+
// All HTTP, HTTPS, TCP, TLS protocols map to TCP for Global Accelerator
341+
tcpPorts = append(tcpPorts, port)
342+
case elbv2types.ProtocolEnumUdp:
343+
// UDP maps directly to UDP for Global Accelerator
344+
udpPorts = append(udpPorts, port)
345+
default:
346+
// Any other protocols are not supported by Global Accelerator
347+
return nil, fmt.Errorf("listener protocol %s is not supported by Global Accelerator for load balancer %s",
348+
listenerProtocol, lbARN)
349+
}
350+
}
351+
352+
return createProtocolPortsInfo(tcpPorts, udpPorts), nil
353+
}
354+
355+
// createProtocolPortsInfo is a helper function that creates ProtocolPortInfo objects from TCP and UDP port lists
356+
func createProtocolPortsInfo(tcpPorts, udpPorts []int32) []ProtocolPortInfo {
357+
var protocolPortsInfo []ProtocolPortInfo
358+
359+
if len(tcpPorts) > 0 {
360+
protocolPortsInfo = append(protocolPortsInfo, ProtocolPortInfo{
361+
Protocol: agaapi.GlobalAcceleratorProtocolTCP,
362+
Ports: tcpPorts,
363+
})
364+
}
365+
if len(udpPorts) > 0 {
366+
protocolPortsInfo = append(protocolPortsInfo, ProtocolPortInfo{
367+
Protocol: agaapi.GlobalAcceleratorProtocolUDP,
368+
Ports: udpPorts,
369+
})
370+
}
371+
372+
return protocolPortsInfo
373+
}

0 commit comments

Comments
 (0)