From bce464b43b2a06d888c7b7b64e3df28caa0990e7 Mon Sep 17 00:00:00 2001 From: Michael Weibel Date: Fri, 5 Dec 2025 17:31:54 +0100 Subject: [PATCH] feat: add nodeselector annotation to limit LB pool members --- README.md | 1 + examples/nginx-hello-nodeselector.yml | 57 +++++ pkg/cloudscale_ccm/cloud.go | 23 +- pkg/cloudscale_ccm/loadbalancer.go | 72 +++++- pkg/cloudscale_ccm/reconcile.go | 20 +- pkg/cloudscale_ccm/reconcile_test.go | 4 +- pkg/cloudscale_ccm/service_info.go | 2 + pkg/internal/integration/service_test.go | 277 ++++++++++++++++++----- pkg/internal/testkit/http.go | 5 +- 9 files changed, 376 insertions(+), 85 deletions(-) create mode 100644 examples/nginx-hello-nodeselector.yml diff --git a/README.md b/README.md index 7675126..50c5161 100644 --- a/README.md +++ b/README.md @@ -429,6 +429,7 @@ Changes to the following annotations causes pools to be recreated and cause an e - `k8s.cloudscale.ch/loadbalancer-pool-algorithm` - `k8s.cloudscale.ch/loadbalancer-pool-protocol` - `k8s.cloudscale.ch/loadbalancer-listener-allowed-subnets` +- `k8s.cloudscale.ch/loadbalancer-node-selector` Additionally, changes to `spec.externalTrafficPolicy` have the same effect. diff --git a/examples/nginx-hello-nodeselector.yml b/examples/nginx-hello-nodeselector.yml new file mode 100644 index 0000000..b9049d1 --- /dev/null +++ b/examples/nginx-hello-nodeselector.yml @@ -0,0 +1,57 @@ +# Deploys the docker.io/nginxdemos/hello:plain-text container and creates a +# loadbalancer service with a node-selector annotation for it: +# +# export KUBECONFIG=path/to/kubeconfig +# kubectl apply -f nginx-hello.yml +# +# Wait for `kubectl describe service hello` to show "Loadbalancer Ensured", +# then use the IP address found under "LoadBalancer Ingress" to connect to the +# service. +# +# You can also use the following shortcut: +# +# curl http://$(kubectl get service hello -o jsonpath='{.status.loadBalancer.ingress[0].ip}') +# +# If you follow the nginx log, you will see that nginx sees a cluster internal +# IP address as source of requests: +# +# kubectl logs -l "app=hello" +# +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: hello +spec: + replicas: 2 + selector: + matchLabels: + app: hello + template: + metadata: + labels: + app: hello + spec: + containers: + - name: hello + image: docker.io/nginxdemos/hello:plain-text + nodeSelector: + kubernetes.io/hostname: k8test-worker-2 +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: hello + annotations: + k8s.cloudscale.ch/loadbalancer-node-selector: "kubernetes.io/hostname=k8test-worker-2" + name: hello +spec: + ports: + - port: 80 + protocol: TCP + targetPort: 80 + name: http + selector: + app: hello + type: LoadBalancer diff --git a/pkg/cloudscale_ccm/cloud.go b/pkg/cloudscale_ccm/cloud.go index 9ea001e..45c1d4b 100644 --- a/pkg/cloudscale_ccm/cloud.go +++ b/pkg/cloudscale_ccm/cloud.go @@ -9,9 +9,13 @@ import ( "strings" "time" - cloudscale "github.com/cloudscale-ch/cloudscale-go-sdk/v6" + "github.com/cloudscale-ch/cloudscale-go-sdk/v6" "golang.org/x/oauth2" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" cloudprovider "k8s.io/cloud-provider" @@ -32,6 +36,9 @@ const ( type cloud struct { instances *instances loadbalancer *loadbalancer + + eventBroadcaster record.EventBroadcaster + eventRecorder record.EventRecorder } // Register this provider with Kubernetes. @@ -114,6 +121,20 @@ func (c *cloud) Initialize( // in newCloudscaleClient c.loadbalancer.k8s = kubernetes.NewForConfigOrDie( clientBuilder.ConfigOrDie("cloudscale-cloud-controller-manager")) + + clientset := clientBuilder.ClientOrDie("cloud-controller-manager") + + c.eventBroadcaster = record.NewBroadcaster() + c.eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{ + Interface: clientset.CoreV1().Events(""), + }) + c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, + corev1.EventSource{ + Component: "cloud-provider-cloudscale", + }, + ) + + c.loadbalancer.recorder = c.eventRecorder } // LoadBalancer returns a balancer interface. Also returns true if the diff --git a/pkg/cloudscale_ccm/loadbalancer.go b/pkg/cloudscale_ccm/loadbalancer.go index 82ee99c..129203c 100644 --- a/pkg/cloudscale_ccm/loadbalancer.go +++ b/pkg/cloudscale_ccm/loadbalancer.go @@ -2,7 +2,6 @@ package cloudscale_ccm import ( "context" - "errors" "fmt" "slices" "strings" @@ -11,7 +10,9 @@ import ( "github.com/cloudscale-ch/cloudscale-go-sdk/v6" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/utils/ptr" ) @@ -208,7 +209,7 @@ const ( // connections timing out while the monitor is updated. LoadBalancerHealthMonitorTimeoutS = "k8s.cloudscale.ch/loadbalancer-health-monitor-timeout-s" - // LoadBalancerHealthMonitorDownThreshold is the number of the checks that + // LoadBalancerHealthMonitorUpThreshold is the number of the checks that // need to succeed before a pool member is considered up. Defaults to 2. LoadBalancerHealthMonitorUpThreshold = "k8s.cloudscale.ch/loadbalancer-health-monitor-up-threshold" @@ -278,7 +279,7 @@ const ( // Changing this annotation on an established service is considered safe. LoadBalancerListenerTimeoutMemberDataMS = "k8s.cloudscale.ch/loadbalancer-timeout-member-data-ms" - // LoadBalancerSubnetLimit is a JSON list of subnet UUIDs that the + // LoadBalancerListenerAllowedSubnets is a JSON list of subnet UUIDs that the // loadbalancer should use. By default, all subnets of a node are used: // // * `[]` means that anyone is allowed to connect (default). @@ -291,12 +292,17 @@ const ( // This is an advanced feature, useful if you have nodes that are in // multiple private subnets. LoadBalancerListenerAllowedSubnets = "k8s.cloudscale.ch/loadbalancer-listener-allowed-subnets" + + // LoadBalancerNodeSelector can be set to restrict which nodes are added to the LB pool. + // It accepts a standard Kubernetes label selector string. + LoadBalancerNodeSelector = "k8s.cloudscale.ch/loadbalancer-node-selector" ) type loadbalancer struct { - lbs lbMapper - srv serverMapper - k8s kubernetes.Interface + lbs lbMapper + srv serverMapper + k8s kubernetes.Interface + recorder record.EventRecorder } // GetLoadBalancer returns whether the specified load balancer exists, and @@ -387,16 +393,23 @@ func (l *loadbalancer) EnsureLoadBalancer( return nil, err } - // Refuse to do anything if there are no nodes + nodes, err := filterNodesBySelector(serviceInfo, nodes) + if err != nil { + return nil, err + } + if len(nodes) == 0 { - return nil, errors.New( - "no valid nodes for service found, please verify there is " + - "at least one that allows load balancers", + l.recorder.Event( + service, + v1.EventTypeWarning, + "NoValidNodes", + "No valid nodes for service found, "+ + "double-check node-selector annotation", ) } // Reconcile - err := reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) { + err = reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) { // Get the desired state from Kubernetes servers, err := l.srv.mapNodes(ctx, nodes).All() if err != nil { @@ -442,6 +455,28 @@ func (l *loadbalancer) EnsureLoadBalancer( return result, nil } +func filterNodesBySelector( + serviceInfo *serviceInfo, + nodes []*v1.Node, +) ([]*v1.Node, error) { + selector := labels.Everything() + if v := serviceInfo.annotation(LoadBalancerNodeSelector); v != "" { + var err error + selector, err = labels.Parse(v) + if err != nil { + return nil, fmt.Errorf("unable to parse selector: %w", err) + } + } + selectedNodes := make([]*v1.Node, 0, len(nodes)) + for _, node := range nodes { + if selector.Matches(labels.Set(node.Labels)) { + selectedNodes = append(selectedNodes, node) + } + } + + return selectedNodes, nil +} + // UpdateLoadBalancer updates hosts under the specified load balancer. // Implementations must treat the *v1.Service and *v1.Node // parameters as read-only and not modify them. @@ -461,6 +496,21 @@ func (l *loadbalancer) UpdateLoadBalancer( return err } + nodes, err := filterNodesBySelector(serviceInfo, nodes) + if err != nil { + return err + } + + if len(nodes) == 0 { + l.recorder.Event( + service, + v1.EventTypeWarning, + "NoValidNodes", + "No valid nodes for service found, "+ + "double-check node-selector annotation", + ) + } + // Reconcile return reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) { // Get the desired state from Kubernetes diff --git a/pkg/cloudscale_ccm/reconcile.go b/pkg/cloudscale_ccm/reconcile.go index a387c86..ef7596a 100644 --- a/pkg/cloudscale_ccm/reconcile.go +++ b/pkg/cloudscale_ccm/reconcile.go @@ -22,16 +22,13 @@ type lbState struct { // Pool pointers are used to refer to members by pool, therefore use a // pointer here as well, to not accidentally copy the struct. - pools []*cloudscale.LoadBalancerPool - members map[*cloudscale.LoadBalancerPool][]cloudscale. - LoadBalancerPoolMember - monitors map[*cloudscale.LoadBalancerPool][]cloudscale. - LoadBalancerHealthMonitor + pools []*cloudscale.LoadBalancerPool + members map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerPoolMember + monitors map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerHealthMonitor // Though not currently used that way, listeners are not // necessarily bound to any given pool. - listeners map[*cloudscale.LoadBalancerPool][]cloudscale. - LoadBalancerListener + listeners map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerListener // The assigned floating IPs floatingIPs []string @@ -201,15 +198,6 @@ func desiredLbState( } } - // If there are no pool members, return an error. It would be possible - // to just put a load balancer up that has no function, but it seems - // more useful to err instead, as there's likely something wrong. - if len(s.members[&pool]) == 0 { - return nil, fmt.Errorf( - "service %s: no private address found on any node", - serviceInfo.Service.Name) - } - // Add a health monitor for each pool monitor, err := healthMonitorForPort(serviceInfo) if err != nil { diff --git a/pkg/cloudscale_ccm/reconcile_test.go b/pkg/cloudscale_ccm/reconcile_test.go index 84c9e9a..b9a9f32 100644 --- a/pkg/cloudscale_ccm/reconcile_test.go +++ b/pkg/cloudscale_ccm/reconcile_test.go @@ -1156,10 +1156,10 @@ func TestLimitSubnets(t *testing.T) { assert.Equal(t, "10.0.1.1", state.members[state.pools[0]][0].Address) assert.Equal(t, "10.0.1.2", state.members[state.pools[0]][1].Address) - // If we have no valid addresses, we get an error + // If we have no valid addresses, we get no error s.Annotations[LoadBalancerListenerAllowedSubnets] = ` ["00000000-0000-0000-0000-000000000003"]` _, err = desiredLbState(i, nodes, servers) - assert.Error(t, err) + assert.NoError(t, err) } diff --git a/pkg/cloudscale_ccm/service_info.go b/pkg/cloudscale_ccm/service_info.go index 5fe5eee..e4a51f2 100644 --- a/pkg/cloudscale_ccm/service_info.go +++ b/pkg/cloudscale_ccm/service_info.go @@ -118,6 +118,8 @@ func (s serviceInfo) annotation(key string) string { return s.annotationOrDefault(key, "50000") case LoadBalancerListenerAllowedSubnets: return s.annotationOrDefault(key, "[]") + case LoadBalancerNodeSelector: + return s.annotationOrDefault(key, "") default: return s.annotationOrElse(key, func() string { klog.Warning("unknown annotation:", key) diff --git a/pkg/internal/integration/service_test.go b/pkg/internal/integration/service_test.go index fb7f1d3..54eb61e 100644 --- a/pkg/internal/integration/service_test.go +++ b/pkg/internal/integration/service_test.go @@ -28,14 +28,7 @@ import ( ) func (s *IntegrationTestSuite) CreateDeployment( - name string, image string, replicas int32, protocol v1.Protocol, port int32, args ...string) { - - var command []string - - if len(args) > 0 { - command = args[:1] - args = args[1:] - } + name string, image string, replicas int32, protocol v1.Protocol, port int32, options ...func(*appsv1.DeploymentSpec)) { spec := appsv1.DeploymentSpec{ Replicas: &replicas, @@ -53,10 +46,8 @@ func (s *IntegrationTestSuite) CreateDeployment( Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: name, - Image: image, - Command: command, - Args: args, + Name: name, + Image: image, Ports: []v1.ContainerPort{ {ContainerPort: port, Protocol: protocol}, }, @@ -66,6 +57,10 @@ func (s *IntegrationTestSuite) CreateDeployment( }, } + for _, opt := range options { + opt(&spec) + } + _, err := s.k8s.AppsV1().Deployments(s.ns).Create( context.Background(), &appsv1.Deployment{ @@ -100,25 +95,33 @@ type ServicePortSpec struct { TargetPort int32 } -func (s *IntegrationTestSuite) ExposeDeployment( - name string, annotations map[string]string, ports ...ServicePortSpec) { - - servicePorts := make([]v1.ServicePort, len(ports)) - for i, p := range ports { - servicePorts[i] = v1.ServicePort{ - Name: fmt.Sprintf("port%d", i), - Protocol: p.Protocol, - Port: p.Port, - TargetPort: intstr.FromInt32(p.TargetPort), +func WithServicePort(sp ServicePortSpec) func(spec *v1.ServiceSpec) { + return func(spec *v1.ServiceSpec) { + if spec.Ports == nil { + spec.Ports = make([]v1.ServicePort, 0) } + + spec.Ports = append(spec.Ports, v1.ServicePort{ + Name: fmt.Sprintf("port%d", len(spec.Ports)), + Protocol: sp.Protocol, + Port: sp.Port, + TargetPort: intstr.FromInt32(sp.TargetPort), + }) } +} + +func (s *IntegrationTestSuite) ExposeDeployment( + name string, annotations map[string]string, options ...func(spec *v1.ServiceSpec)) { spec := v1.ServiceSpec{ Type: v1.ServiceTypeLoadBalancer, Selector: map[string]string{ "app": name, }, - Ports: servicePorts, + } + + for _, f := range options { + f(&spec) } service, err := s.k8s.CoreV1().Services(s.ns).Get( @@ -308,11 +311,11 @@ func (s *IntegrationTestSuite) TestServiceEndToEnd() { // Deploy a TCP server that returns the hostname s.T().Log("Creating nginx deployment") - s.CreateDeployment("nginx", "nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) + s.CreateDeployment("nginx", "docker.io/nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) // Expose the deployment using a LoadBalancer service s.ExposeDeployment("nginx", nil, - ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) // Wait for the service to be ready s.T().Log("Waiting for nginx service to be ready") @@ -366,6 +369,172 @@ func (s *IntegrationTestSuite) TestServiceEndToEnd() { s.Assert().NotContains(lines, "Warn") } +// TestServiceEndToEndNodeSelector tests whether annotating a service with loadbalancer-node-selector works as intended. +// +// It first deploys an nginx on the k8test-worker-2 node and exposes it using a service having the annotation set to this node as well. +// After verifying accessing the nginx works, the test adjusts the annotation to k8test-worker-1 and verifies accessing nginx doesn't work anymore. +// +// This is dependant on setting `externalTrafficPolicy: Local` on the service, since otherwise the request would just be forwarded to the other node. +func (s *IntegrationTestSuite) TestServiceEndToEndNodeSelector() { + start := time.Now() + + s.T().Log("Creating nginx deployment") + s.CreateDeployment("nginx-selected", "docker.io/nginxdemos/hello:plain-text", 1, v1.ProtocolTCP, 80, func(spec *appsv1.DeploymentSpec) { + spec.Template.Spec.NodeSelector = map[string]string{ + "kubernetes.io/hostname": "k8test-worker-2", + } + }) + + s.ExposeDeployment("nginx-selected", map[string]string{cloudscale_ccm.LoadBalancerNodeSelector: "kubernetes.io/hostname=k8test-worker-2"}, + WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}), + func(spec *v1.ServiceSpec) { + spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal + }) + + // Wait for the service to be ready + s.T().Log("Waiting for nginx service to be ready") + service := s.AwaitServiceReady("nginx-selected", 180*time.Second) + s.Require().NotNil(service) + + // Ensure the annotations are set + s.Assert().NotEmpty( + service.Annotations[cloudscale_ccm.LoadBalancerUUID]) + s.Assert().NotEmpty( + service.Annotations[cloudscale_ccm.LoadBalancerConfigVersion]) + s.Assert().NotEmpty( + service.Annotations[cloudscale_ccm.LoadBalancerZone]) + + // Ensure we have two public IP addresses + s.Require().Len(service.Status.LoadBalancer.Ingress, 2) + addr := service.Status.LoadBalancer.Ingress[0].IP + + response, err := testkit.HelloNginx(addr, 80) + s.Assert().NoError(err, "request failed") + if s.Assert().NotNil(response, "response is empty") { + s.Assert().NotEmpty(response.ServerName) + } + + // In this simple case we expect no errors nor warnings + s.T().Log("Checking log output for errors/warnings") + lines := s.CCMLogs(start) + + s.Assert().NotContains(lines, "error") + s.Assert().NotContains(lines, "Error") + s.Assert().NotContains(lines, "warn") + s.Assert().NotContains(lines, "Warn") + + // adjust LoadBalancer to worker-1 + // and ensure we can't reach nginx anymore since it's on another node and externalTrafficPolicy is Local. + + s.ExposeDeployment("nginx-selected", map[string]string{cloudscale_ccm.LoadBalancerNodeSelector: "kubernetes.io/hostname=k8test-worker-1"}, + WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}), + func(spec *v1.ServiceSpec) { + spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal + }) + + // Wait for the service to be ready + s.T().Log("Waiting for nginx service to be ready") + service = s.AwaitServiceReady("nginx-selected", 180*time.Second) + s.Require().NotNil(service) + + // Ensure the annotations are set + s.Assert().NotEmpty( + service.Annotations[cloudscale_ccm.LoadBalancerUUID]) + s.Assert().NotEmpty( + service.Annotations[cloudscale_ccm.LoadBalancerConfigVersion]) + s.Assert().NotEmpty( + service.Annotations[cloudscale_ccm.LoadBalancerZone]) + + // verify only one pool member exists, abort test if there was an error checking this. + if s.Assert().NoError(s.assertLBPoolMembersLen(service, 1)) { + return + } + + // Ensure we have two public IP addresses + s.Require().Len(service.Status.LoadBalancer.Ingress, 2) + addr = service.Status.LoadBalancer.Ingress[0].IP + + response, err = testkit.HelloNginx(addr, 80) + s.Assert().Error(err, "request successful") + s.Assert().Nil(response, "response is empty") + + // In this simple case we expect no errors nor warnings + s.T().Log("Checking log output for errors/warnings") + lines = s.CCMLogs(start) + + s.Assert().NotContains(lines, "error") + s.Assert().NotContains(lines, "Error") + s.Assert().NotContains(lines, "warn") + s.Assert().NotContains(lines, "Warn") + + // adjust LoadBalancer to a selector where no nodes are existing + // and ensure we can't reach nginx anymore since it's on another node and externalTrafficPolicy is Local. + + s.ExposeDeployment("nginx-selected", map[string]string{cloudscale_ccm.LoadBalancerNodeSelector: "kubernetes.io/hostname=notexisting"}, + WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}), + func(spec *v1.ServiceSpec) { + spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal + }) + + // Wait for the service to be ready + s.T().Log("Waiting for nginx service to be ready") + service = s.AwaitServiceReady("nginx-selected", 180*time.Second) + s.Require().NotNil(service) + + // Ensure the annotations are set + s.Assert().NotEmpty( + service.Annotations[cloudscale_ccm.LoadBalancerUUID]) + s.Assert().NotEmpty( + service.Annotations[cloudscale_ccm.LoadBalancerConfigVersion]) + s.Assert().NotEmpty( + service.Annotations[cloudscale_ccm.LoadBalancerZone]) + + // verify only one pool member exists, abort test if there was an error checking this. + if s.Assert().NoError(s.assertLBPoolMembersLen(service, 0)) { + return + } + + // Ensure we have two public IP addresses + s.Require().Len(service.Status.LoadBalancer.Ingress, 2) + addr = service.Status.LoadBalancer.Ingress[0].IP + + response, err = testkit.HelloNginx(addr, 80) + s.Assert().Error(err, "request successful") + s.Assert().Nil(response, "response is empty") + + // In this simple case we expect no errors nor warnings + s.T().Log("Checking log output for errors/warnings") + lines = s.CCMLogs(start) + + s.Assert().NotContains(lines, "error") + s.Assert().NotContains(lines, "Error") + s.Assert().NotContains(lines, "warn") + s.Assert().NotContains(lines, "Warn") +} + +// assertLBPoolMembersLen fetches the loadbalancer pool members and asserts the size is equal given len. +func (s *IntegrationTestSuite) assertLBPoolMembersLen(service *v1.Service, len int) error { + pools, err := s.api.LoadBalancerPools.List(s.T().Context()) + if err != nil { + return fmt.Errorf("unable to list loadbalancer pools: %w", err) + } + + var pool cloudscale.LoadBalancerPool + for _, p := range pools { + if p.LoadBalancer.UUID == service.Annotations[cloudscale_ccm.LoadBalancerUUID] { + pool = p + break + } + } + members, err := s.api.LoadBalancerPoolMembers.List(s.T().Context(), pool.UUID) + if err != nil { + return fmt.Errorf("unable to list pool members: %w", err) + } + + s.Assert().Len(members, len) + return nil +} + func (s *IntegrationTestSuite) TestServiceEndToEndUDP() { // Note the start for the log @@ -373,19 +542,17 @@ func (s *IntegrationTestSuite) TestServiceEndToEndUDP() { // Deploy a UDP echo server s.T().Log("Creating udp-echo deployment") - s.CreateDeployment("udp-echo", "docker.io/alpine/socat", 2, v1.ProtocolUDP, 5353, - "socat", - "-v", - "UDP4-RECVFROM:5353,fork", - "SYSTEM:echo 'I could tell you a UDP joke, but you might not get it...',pipes", - ) + s.CreateDeployment("udp-echo", "docker.io/alpine/socat", 2, v1.ProtocolUDP, 5353, func(spec *appsv1.DeploymentSpec) { + spec.Template.Spec.Containers[0].Command = []string{"socat"} + spec.Template.Spec.Containers[0].Args = []string{"-v", "UDP4-RECVFROM:5353,fork", "SYSTEM:echo 'I could tell you a UDP joke, but you might not get it...',pipes"} + }) // Expose the deployment using a LoadBalancer service with UDP annotations s.ExposeDeployment("udp-echo", map[string]string{ "k8s.cloudscale.ch/loadbalancer-health-monitor-type": "udp-connect", "k8s.cloudscale.ch/loadbalancer-health-monitor-delay-s": "3", "k8s.cloudscale.ch/loadbalancer-health-monitor-timeout-s": "2", - }, ServicePortSpec{Protocol: v1.ProtocolUDP, Port: 5000, TargetPort: 5353}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolUDP, Port: 5000, TargetPort: 5353})) // Wait for the service to be ready s.T().Log("Waiting for udp-echo service to be ready") @@ -546,8 +713,8 @@ func (s *IntegrationTestSuite) TestServiceEndToEndDualProtocol() { s.Require().NoError(err) s.ExposeDeployment("dns-server", nil, - ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 53, TargetPort: 53}, - ServicePortSpec{Protocol: v1.ProtocolUDP, Port: 53, TargetPort: 53}, + WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 53, TargetPort: 53}), + WithServicePort(ServicePortSpec{Protocol: v1.ProtocolUDP, Port: 53, TargetPort: 53}), ) // Wait for the service to be ready @@ -661,13 +828,13 @@ func (s *IntegrationTestSuite) TestServiceVIPAddresses() { // Deploy a TCP server that returns something s.T().Log("Creating foo deployment") - s.CreateDeployment("nginx", "nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) + s.CreateDeployment("nginx", "docker.io/nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) // Expose the deployment using a LoadBalancer service s.ExposeDeployment("nginx", map[string]string{ "k8s.cloudscale.ch/loadbalancer-vip-addresses": fmt.Sprintf( `[{"subnet": "%s"}]`, subnet), - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) s.T().Log("Waiting for nginx service to be ready") service := s.AwaitServiceReady("nginx", 180*time.Second) @@ -763,7 +930,7 @@ func (s *IntegrationTestSuite) TestServiceTrafficPolicyLocal() { } // Expose the deployment using a LoadBalancer service - s.ExposeDeployment("peeraddr", nil, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 3000}) + s.ExposeDeployment("peeraddr", nil, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 3000})) // Wait for the service to be ready s.T().Log("Waiting for peeraddr service to be ready") @@ -827,13 +994,13 @@ func (s *IntegrationTestSuite) RunTestServiceWithFloatingIP( // Deploy a TCP server that returns the hostname s.T().Log("Creating nginx deployment") - s.CreateDeployment("nginx", "nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) + s.CreateDeployment("nginx", "docker.io/nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) // Expose the deployment using a LoadBalancer service with Floating IP s.ExposeDeployment("nginx", map[string]string{ "k8s.cloudscale.ch/loadbalancer-floating-ips": fmt.Sprintf( `["%s"]`, fip.Network), - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) // Wait for the service to be ready s.T().Log("Waiting for nginx service to be ready") @@ -889,13 +1056,13 @@ func (s *IntegrationTestSuite) TestFloatingIPConflicts() { // Deploy a TCP server that returns the hostname s.T().Log("Creating nginx deployment") - s.CreateDeployment("nginx", "nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) + s.CreateDeployment("nginx", "docker.io/nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) // Expose the deployment using a LoadBalancer service with Floating IP s.ExposeDeployment("nginx", map[string]string{ "k8s.cloudscale.ch/loadbalancer-floating-ips": fmt.Sprintf( `["%s"]`, regional.Network), - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) // Wait for the service to be ready s.T().Log("Waiting for nginx service to be ready") @@ -908,7 +1075,7 @@ func (s *IntegrationTestSuite) TestFloatingIPConflicts() { s.ExposeDeployment("service-2", map[string]string{ "k8s.cloudscale.ch/loadbalancer-floating-ips": fmt.Sprintf( `["%s"]`, regional.Network), - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) // Wait for a moment before checking the log time.Sleep(5 * time.Second) @@ -929,13 +1096,19 @@ func (s *IntegrationTestSuite) TestServiceProxyProtocol() { // Deploy our http-echo server to check for proxy connections s.T().Log("Creating http-echo deployment", "branch", branch) - s.CreateDeployment("http-echo", "golang", 2, v1.ProtocolTCP, 80, "bash", "-c", fmt.Sprintf(` - git clone https://github.com/cloudscale-ch/cloudscale-cloud-controller-manager ccm; - cd ccm; - git checkout %s || exit 1; - cd cmd/http-echo; - go run main.go -host 0.0.0.0 -port 80 - `, branch)) + s.CreateDeployment("http-echo", "docker.io/golang", 2, v1.ProtocolTCP, 80, func(spec *appsv1.DeploymentSpec) { + spec.Template.Spec.Containers[0].Command = []string{"bash"} + spec.Template.Spec.Containers[0].Args = []string{ + "-c", + fmt.Sprintf(` + git clone https://github.com/cloudscale-ch/cloudscale-cloud-controller-manager ccm; + cd ccm; + git checkout %s || exit 1; + cd cmd/http-echo; + go run main.go -host 0.0.0.0 -port 80 + `, branch), + } + }) // Expose the deployment using a LoadBalancer service s.ExposeDeployment("http-echo", map[string]string{ @@ -944,7 +1117,7 @@ func (s *IntegrationTestSuite) TestServiceProxyProtocol() { // Make sure to get the default behavior of older Kubernetes releases, // even on newer releases. "k8s.cloudscale.ch/loadbalancer-ip-mode": "VIP", - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) // Wait for the service to be ready s.T().Log("Waiting for http-echo service to be ready") @@ -992,7 +1165,7 @@ func (s *IntegrationTestSuite) TestServiceProxyProtocol() { "%s.cust.cloudscale.ch", strings.ReplaceAll(addr, ".", "-"), ), - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) s.T().Log("Testing PROXY protocol from inside with workaround") used = s.RunJob("curlimages/curl", 90*time.Second, "curl", "-s", url) @@ -1005,7 +1178,7 @@ func (s *IntegrationTestSuite) TestServiceProxyProtocol() { if newer { s.ExposeDeployment("http-echo", map[string]string{ "k8s.cloudscale.ch/loadbalancer-pool-protocol": "proxy", - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) s.T().Log("Testing PROXY protocol on newer Kubernetes releases") used = s.RunJob("curlimages/curl", 90*time.Second, "curl", "-s", url) diff --git a/pkg/internal/testkit/http.go b/pkg/internal/testkit/http.go index 9658654..c01cf40 100644 --- a/pkg/internal/testkit/http.go +++ b/pkg/internal/testkit/http.go @@ -19,9 +19,8 @@ type HelloResponse struct { } func HelloNginx(addr string, port uint16) (*HelloResponse, error) { - body, err := HTTPRead( - "http://" + net.JoinHostPort(addr, strconv.FormatUint( - uint64(port), 10))) + body, err := HTTPRead("http://" + net.JoinHostPort(addr, strconv.FormatUint( + uint64(port), 10))) if err != nil { return nil, err }