Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 160 additions & 12 deletions cloudstack_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,23 @@ const (
ServiceAnnotationLoadBalancerProxyProtocol = "service.beta.kubernetes.io/cloudstack-load-balancer-proxy-protocol"
ServiceAnnotationLoadBalancerLoadbalancerHostname = "service.beta.kubernetes.io/cloudstack-load-balancer-hostname"
ServiceAnnotationLoadBalancerSourceCidrs = "service.beta.kubernetes.io/cloudstack-load-balancer-source-cidrs"

ServiceAnnotationLoadBalancerStickynessMethodName = "service.beta.kubernetes.io/cloudstack-load-balancer-stickyness-method-name"
ServiceAnnotationLoadBalancerStickynessParam = "service.beta.kubernetes.io/cloudstack-load-balancer-stickyness-method-param"
)

type loadBalancer struct {
*cloudstack.CloudStackClient

name string
algorithm string
hostIDs []string
ipAddr string
ipAddrID string
networkID string
projectID string
rules map[string]*cloudstack.LoadBalancerRule
name string
algorithm string
hostIDs []string
ipAddr string
ipAddrID string
networkID string
projectID string
rules map[string]*cloudstack.LoadBalancerRule
stickynessPolicies map[string]*cloudstack.LBStickinessPolicyStickinesspolicy
}

// GetLoadBalancer returns whether the specified load balancer exists, and if so, what its status is.
Expand Down Expand Up @@ -161,12 +165,36 @@ func (cs *CSCloud) EnsureLoadBalancer(ctx context.Context, clusterName string, s
// Delete the rule from the map, to prevent it being deleted.
delete(lb.rules, lbRuleName)
}

stickynessPolicy, stickynessPolicyNeedsUpdate, err := lb.checkStickynessPolicy(lbRule, service)
if err != nil {
return nil, err
}
if stickynessPolicyNeedsUpdate {
if stickynessPolicy != nil {
klog.V(4).Infof("Recreate stickyness policy: %v", lbRuleName)
if err := lb.deleteStickynessPolicy(stickynessPolicy.Id); err != nil {
return nil, err
}
delete(lb.stickynessPolicies, lbRule.Id)
} else {
klog.V(4).Infof("Creating stickyness policy: %v", lbRuleName)
}
if _, err := lb.createStickynessPolicy(lbRuleName, lbRule.Id, service); err != nil {
return nil, err
}
// Remove from map to mark as handled (map tracks initial state for comparison)
delete(lb.stickynessPolicies, lbRule.Id)
}
} else {
klog.V(4).Infof("Creating load balancer rule: %v", lbRuleName)
lbRule, err = lb.createLoadBalancerRule(lbRuleName, port, protocol, service)
if err != nil {
return nil, err
}
if _, err := lb.createStickynessPolicy(lbRuleName, lbRule.Id, service); err != nil {
return nil, err
}

klog.V(4).Infof("Assigning hosts (%v) to load balancer rule: %v", lb.hostIDs, lbRuleName)
if err = lb.assignHostsToRule(lbRule, lb.hostIDs); err != nil {
Expand Down Expand Up @@ -372,10 +400,11 @@ func (cs *CSCloud) GetLoadBalancerName(ctx context.Context, clusterName string,
// getLoadBalancer retrieves the IP address and ID and all the existing rules it can find.
func (cs *CSCloud) getLoadBalancer(service *corev1.Service) (*loadBalancer, error) {
lb := &loadBalancer{
CloudStackClient: cs.client,
name: cs.GetLoadBalancerName(context.TODO(), "", service),
projectID: cs.projectID,
rules: make(map[string]*cloudstack.LoadBalancerRule),
CloudStackClient: cs.client,
name: cs.GetLoadBalancerName(context.TODO(), "", service),
projectID: cs.projectID,
rules: make(map[string]*cloudstack.LoadBalancerRule),
stickynessPolicies: make(map[string]*cloudstack.LBStickinessPolicyStickinesspolicy),
}

p := cs.client.LoadBalancer.NewListLoadBalancerRulesParams()
Expand All @@ -400,6 +429,16 @@ func (cs *CSCloud) getLoadBalancer(service *corev1.Service) (*loadBalancer, erro

lb.ipAddr = lbRule.Publicip
lb.ipAddrID = lbRule.Publicipid

lbStickinessPoliciesParams := cs.client.LoadBalancer.NewListLBStickinessPoliciesParams()
lbStickinessPoliciesParams.SetLbruleid(lbRule.Id)
lbStickinessPolicies, err := cs.client.LoadBalancer.ListLBStickinessPolicies(lbStickinessPoliciesParams)
if err != nil {
return nil, fmt.Errorf("error retrieving stickyness policies: %v", err)
}
if len(lbStickinessPolicies.LBStickinessPolicies) > 0 {
lb.stickynessPolicies[lbRule.Id] = &lbStickinessPolicies.LBStickinessPolicies[0].Stickinesspolicy[0]
}
}

klog.V(4).Infof("Load balancer %v contains %d rule(s)", lb.name, len(lb.rules))
Expand Down Expand Up @@ -561,6 +600,60 @@ func (lb *loadBalancer) releaseLoadBalancerIP() error {
return nil
}

func (lb *loadBalancer) checkStickynessPolicy(lbRule *cloudstack.LoadBalancerRule, service *corev1.Service) (*cloudstack.LBStickinessPolicyStickinesspolicy, bool, error) {
stickynessPolicy := lb.stickynessPolicies[lbRule.Id]
stickynessMethodName := getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerStickynessMethodName, "")
stickynessMethodParam := getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerStickynessParam, "")
stickynessMethodParams := parseStickynessParams(stickynessMethodParam)

// If no policy exists and no method name is specified, no action needed
if stickynessPolicy == nil {
if stickynessMethodName == "" {
return nil, false, nil
}
klog.V(4).Infof("sticky policy not found for rule: %v", lbRule.Name)
return nil, true, nil
}

// If policy exists but method name is not specified, policy should be deleted
if stickynessMethodName == "" {
klog.V(4).Infof("sticky policy exists but annotation removed for rule: %v", lbRule.Name)
return stickynessPolicy, true, nil
}

// Policy exists and method name is specified - check if it matches
klog.V(4).Infof("sticky policy found for rule: %v", lbRule.Name)
if stickynessPolicy.Methodname != stickynessMethodName {
klog.V(4).Infof("sticky policy method name does not match: %v", lbRule.Name)
return stickynessPolicy, true, nil
}

// Check if params match
if len(stickynessPolicy.Params) != len(stickynessMethodParams) {
klog.V(4).Infof("sticky policy params length does not match: %v", lbRule.Name)
return stickynessPolicy, true, nil
}

// Check if all keys in stickynessPolicy.Params match stickynessMethodParams
for key, value := range stickynessPolicy.Params {
if stickynessMethodParams[key] != value {
klog.V(4).Infof("sticky policy param %v does not match: %v", key, value)
return stickynessPolicy, true, nil
}
}

// Check if all keys in stickynessMethodParams exist in stickynessPolicy.Params
for key := range stickynessMethodParams {
if _, exists := stickynessPolicy.Params[key]; !exists {
klog.V(4).Infof("sticky policy missing param: %v", key)
return stickynessPolicy, true, nil
}
}

// Policy matches desired state
return stickynessPolicy, false, nil
}

// checkLoadBalancerRule checks if the rule already exists and if it does, if it can be updated. If
// it does exist but cannot be updated, it will delete the existing rule so it can be created again.
func (lb *loadBalancer) checkLoadBalancerRule(lbRuleName string, port corev1.ServicePort, protocol LoadBalancerProtocol) (*cloudstack.LoadBalancerRule, bool, error) {
Expand Down Expand Up @@ -596,6 +689,43 @@ func (lb *loadBalancer) updateLoadBalancerRule(lbRuleName string, protocol LoadB
return err
}

// createStickynessPolicy creates a new stickyness policy and returns it.
func (lb *loadBalancer) createStickynessPolicy(lbRuleName string, lbRuleId string, service *corev1.Service) (*cloudstack.LBStickinessPolicyStickinesspolicy, error) {
stickynessMethodName := getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerStickynessMethodName, "")
stickynessMethodParam := getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerStickynessParam, "")
// If the stickyness method name is not set, we don't need to create a stickyness policy.
if stickynessMethodName == "" {
return nil, nil
}
p := lb.LoadBalancer.NewCreateLBStickinessPolicyParams(lbRuleId, stickynessMethodName, lbRuleName)

params := parseStickynessParams(stickynessMethodParam)
p.SetParam(params)

stickynessPolicy, err := lb.LoadBalancer.CreateLBStickinessPolicy(p)
if err != nil {
return nil, fmt.Errorf("error creating stickyness policy: %v", err)
}
// return &stickynessPolicy.Stickinesspolicy[0].Stickinesspolicy, nil
return &cloudstack.LBStickinessPolicyStickinesspolicy{
Methodname: stickynessPolicy.Stickinesspolicy[0].Methodname,
Params: stickynessPolicy.Stickinesspolicy[0].Params,
Id: stickynessPolicy.Stickinesspolicy[0].Id,
Name: stickynessPolicy.Stickinesspolicy[0].Name,
State: stickynessPolicy.Stickinesspolicy[0].State,
}, nil
}

// deleteStickynessPolicy deletes a stickyness policy.
func (lb *loadBalancer) deleteStickynessPolicy(stickynessPolicyId string) error {
p := lb.LoadBalancer.NewDeleteLBStickinessPolicyParams(stickynessPolicyId)

if _, err := lb.LoadBalancer.DeleteLBStickinessPolicy(p); err != nil {
return fmt.Errorf("error deleting stickyness policy %v: %v", stickynessPolicyId, err)
}
return nil
}

// createLoadBalancerRule creates a new load balancer rule and returns it's ID.
func (lb *loadBalancer) createLoadBalancerRule(lbRuleName string, port corev1.ServicePort, protocol LoadBalancerProtocol, service *corev1.Service) (*cloudstack.LoadBalancerRule, error) {
p := lb.LoadBalancer.NewCreateLoadBalancerRuleParams(
Expand Down Expand Up @@ -663,6 +793,7 @@ func (lb *loadBalancer) deleteLoadBalancerRule(lbRule *cloudstack.LoadBalancerRu

// Delete the rule from the map as it no longer exists
delete(lb.rules, lbRule.Name)
delete(lb.stickynessPolicies, lbRule.Id)

return nil
}
Expand Down Expand Up @@ -1027,6 +1158,23 @@ func getStringFromServiceAnnotation(service *corev1.Service, annotationKey strin
return defaultSetting
}

// parseStickynessParams parses a comma-separated string of key=value pairs into a map.
// Empty values and malformed entries are ignored.
func parseStickynessParams(paramString string) map[string]string {
params := make(map[string]string)
for _, param := range strings.Split(paramString, ",") {
param = strings.TrimSpace(param)
if param == "" {
continue
}
parts := strings.SplitN(param, "=", 2)
if len(parts) == 2 {
params[parts[0]] = parts[1]
}
}
return params
}

// getBoolFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's boolean value or a specified defaultSetting
func getBoolFromServiceAnnotation(service *corev1.Service, annotationKey string, defaultSetting bool) bool {
klog.V(4).Infof("getBoolFromServiceAnnotation(%s/%s, %v, %v)", service.Namespace, service.Name, annotationKey, defaultSetting)
Expand Down
Loading