Skip to content

e2e/loadbalancer: added hairpin connection cases #1161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
315 changes: 286 additions & 29 deletions tests/e2e/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ package e2e
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"

. "github.com/onsi/ginkgo/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
Expand All @@ -43,6 +47,10 @@ var (
clusterNodesSelector string
clusterNodesCount int = 0

// clusterNodeSingleWorker is a single worker node used to test cases.
clusterNodeSingleWorker string
clusterNodesMutex sync.Mutex

// lookupNodeSelectors are valid compute/node/worker selectors commonly used in different kubernetes
// distributions.
lookupNodeSelectors = []string{
Expand Down Expand Up @@ -71,11 +79,14 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
})

type loadBalancerTestCases struct {
Name string
ResourceSuffix string
Annotations map[string]string
PostConfigService func(cfg *configServiceLB, svc *v1.Service)
PostRunValidation func(cfg *configServiceLB, svc *v1.Service)
Name string
ResourceSuffix string
Annotations map[string]string
PostConfigService func(cfg *configServiceLB, svc *v1.Service)
PostRunValidation func(cfg *configServiceLB, svc *v1.Service)
RemoteTestReachableHTTP bool
RequireAffinity bool
ExpectTestFail bool
}
cases := []loadBalancerTestCases{
{
Expand All @@ -93,29 +104,10 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
{
Name: "NLB should configure the loadbalancer with target-node-labels",
ResourceSuffix: "sg-nd",
Annotations: map[string]string{
annotationLBType: "nlb",
},
Annotations: map[string]string{annotationLBType: "nlb"},
PostConfigService: func(cfg *configServiceLB, svc *v1.Service) {
// discover clusterNodeSelector and patch service
// TODO: move to external function if there are more scenarios to discover nodes.
By("discovering node label used in the kubernetes distributions")
for _, selector := range lookupNodeSelectors {
nodeList, err := cs.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
LabelSelector: selector,
})
framework.ExpectNoError(err, "failed to list worker nodes")
if len(nodeList.Items) > 0 {
clusterNodesCount = len(nodeList.Items)
clusterNodesSelector = selector
break
}
}

if clusterNodesCount == 0 {
framework.ExpectNoError(fmt.Errorf("unable to find node selector for %v", lookupNodeSelectors))
}

discoverClusterWorkerNode(cs)
By(fmt.Sprintf("found %d nodes with selector %q\n", clusterNodesCount, clusterNodesSelector))
if svc.Annotations == nil {
svc.Annotations = map[string]string{}
Expand All @@ -132,6 +124,75 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
framework.ExpectNoError(getLBTargetCount(context.TODO(), lbDNS, clusterNodesCount), "AWS LB target count validation failed")
},
},
{
Name: "internet-facing should support hairpin connection",
ResourceSuffix: "hairpin-clb",
Annotations: map[string]string{},
PostConfigService: func(cfg *configServiceLB, svc *v1.Service) {
discoverClusterWorkerNode(cs)
if svc.Annotations == nil {
svc.Annotations = map[string]string{}
}
svc.Annotations[annotationLBTargetNodeLabels] = fmt.Sprintf("kubernetes.io/hostname=%s", clusterNodeSingleWorker)
framework.Logf("Using service annotations: %v", svc.Annotations)
},
RemoteTestReachableHTTP: true,
},
{
Name: "NLB internet-facing should support hairpin connection",
ResourceSuffix: "hairpin-nlb",
Annotations: map[string]string{annotationLBType: "nlb"},
PostConfigService: func(cfg *configServiceLB, svc *v1.Service) {
discoverClusterWorkerNode(cs)
if svc.Annotations == nil {
svc.Annotations = map[string]string{}
}
svc.Annotations[annotationLBTargetNodeLabels] = fmt.Sprintf("kubernetes.io/hostname=%s", clusterNodeSingleWorker)
framework.Logf("Using service annotations: %v", svc.Annotations)
},
RemoteTestReachableHTTP: true,
RequireAffinity: true,
},
{
Name: "internal should support hairpin connection",
ResourceSuffix: "hp-clb-int",
Annotations: map[string]string{
"service.beta.kubernetes.io/aws-load-balancer-internal": "true",
},
PostConfigService: func(cfg *configServiceLB, svc *v1.Service) {
discoverClusterWorkerNode(cs)
if svc.Annotations == nil {
svc.Annotations = map[string]string{}
}
svc.Annotations[annotationLBTargetNodeLabels] = fmt.Sprintf("kubernetes.io/hostname=%s", clusterNodeSingleWorker)
framework.Logf("Using service annotations: %v", svc.Annotations)
},
RemoteTestReachableHTTP: true,
RequireAffinity: true,
},
// FIXME: https://github.com/kubernetes/cloud-provider-aws/issues/1160
// Hairpin connection work with target type as instance only when preserve client IP is disabled.
// Currently CCM does not provide an interface to create a service with that setup, making an internal
// Service to fail.
{
Name: "NLB internal should support hairpin connection",
ResourceSuffix: "hp-nlb-int",
Annotations: map[string]string{
annotationLBType: "nlb",
"service.beta.kubernetes.io/aws-load-balancer-internal": "true",
},
PostConfigService: func(cfg *configServiceLB, svc *v1.Service) {
discoverClusterWorkerNode(cs)
if svc.Annotations == nil {
svc.Annotations = map[string]string{}
}
svc.Annotations[annotationLBTargetNodeLabels] = fmt.Sprintf("kubernetes.io/hostname=%s", clusterNodeSingleWorker)
framework.Logf("Using service annotations: %v", svc.Annotations)
},
RemoteTestReachableHTTP: true,
RequireAffinity: true,
ExpectTestFail: true,
},
}

serviceNameBase := "lbconfig-test"
Expand All @@ -150,6 +211,8 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
By("creating a TCP service " + serviceName + " with type=LoadBalancerType in namespace " + ns.Name)
lbConfig := newConfigServiceLB()
lbConfig.LBJig = e2eservice.NewTestJig(cs, ns.Name, serviceName)

// Hook annotations to support dynamic config
lbServiceConfig := lbConfig.buildService(tc.Annotations)

// Hook: PostConfigService patchs service configuration.
Expand All @@ -169,7 +232,7 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {

// Run Workloads
By("creating a pod to be part of the TCP service " + serviceName)
_, err = lbConfig.LBJig.Run(lbConfig.buildReplicationController())
_, err = lbConfig.LBJig.Run(lbConfig.buildReplicationController(tc.RequireAffinity))
framework.ExpectNoError(err)

// Hook: PostRunValidation performs LB validations after it is created (before test).
Expand All @@ -190,7 +253,11 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
ingressIP := e2eservice.GetIngressPoint(&lbService.Status.LoadBalancer.Ingress[0])
framework.Logf("Load balancer's ingress IP: %s", ingressIP)

e2eservice.TestReachableHTTP(ingressIP, svcPort, e2eservice.LoadBalancerLagTimeoutAWS)
if tc.RemoteTestReachableHTTP {
framework.ExpectNoError(inClusterTestReachableHTTP(cs, ns.Name, clusterNodeSingleWorker, ingressIP, svcPort, tc.ExpectTestFail))
} else {
e2eservice.TestReachableHTTP(ingressIP, svcPort, e2eservice.LoadBalancerLagTimeoutAWS)
}

// Update the service to cluster IP
By("changing TCP service back to type=ClusterIP")
Expand Down Expand Up @@ -281,10 +348,11 @@ func (s *configServiceLB) buildService(extraAnnotations map[string]string) *v1.S
// when the test framework is updated.
// [1] https://github.com/kubernetes/kubernetes/blob/89d95c9713a8fd189e8ad555120838b3c4f888d1/test/e2e/framework/service/jig.go#L636
// [2] https://github.com/kubernetes/kubernetes/issues/119021
func (s *configServiceLB) buildReplicationController() func(rc *v1.ReplicationController) {
func (s *configServiceLB) buildReplicationController(affinity bool) func(rc *v1.ReplicationController) {
return func(rc *v1.ReplicationController) {
var replicas int32 = 1
var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down
var affinity = affinity
rc.ObjectMeta = metav1.ObjectMeta{
Namespace: s.LBJig.Namespace,
Name: s.LBJig.Name,
Expand Down Expand Up @@ -322,6 +390,25 @@ func (s *configServiceLB) buildReplicationController() func(rc *v1.ReplicationCo
},
},
}
if affinity {
rc.Spec.Template.Spec.Affinity = &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
Values: []string{clusterNodeSingleWorker},
},
},
},
},
},
},
}
}
}
}

Expand Down Expand Up @@ -400,3 +487,173 @@ func getLBTargetCount(ctx context.Context, lbDNSName string, expectedTargets int
}
return nil
}

// Lookup worker node selectors of the current kubernetes distribution.
func discoverClusterWorkerNode(cs clientset.Interface) {
// Skip when already discovered
if len(clusterNodesSelector) > 0 {
return
}
clusterNodesMutex.Lock()
defer clusterNodesMutex.Unlock()

var workerNodeList []string
By("discovering node label used in the kubernetes distributions")
for _, selector := range lookupNodeSelectors {
nodeList, err := cs.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
LabelSelector: selector,
})
framework.ExpectNoError(err, "failed to list worker nodes")
if len(nodeList.Items) > 0 {
clusterNodesCount = len(nodeList.Items)
clusterNodesSelector = selector
for _, node := range nodeList.Items {
workerNodeList = append(workerNodeList, node.Name)
}
break
}
}

// Fail when no worker node is found - pourpuse of the function.
if clusterNodesCount == 0 {
framework.ExpectNoError(fmt.Errorf("unable to find node selector for %v", lookupNodeSelectors))
}

// Save the first worker node in the list to be used in cases.
if len(workerNodeList) > 0 {
sort.Strings(workerNodeList)
clusterNodeSingleWorker = workerNodeList[0]
}
}

// func inClusterTestReachableHTTP
// runHTTPTestPod creates a pod to test HTTP connectivity inside the cluster.
// It schedules the pod on the same node as the backend using node affinity.
func inClusterTestReachableHTTP(cs clientset.Interface, namespace, nodeName, targetIP string, targetPort int, expectFailTest bool) error {
podName := "http-test-pod"

// client http test (curl) pod spec.
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "curl",
Image: imageutils.GetE2EImage(imageutils.Agnhost),
Command: []string{"curl"},
Args: []string{
"--retry", "20",
"--retry-delay", "15",
"--retry-max-time", "480",
"--retry-all-errors",
"--trace-time",
"-w", "\\\"\\n---> HTTPCode=%{http_code} Time=%{time_total}ms <---\\n\\\"",
fmt.Sprintf("http://%s:%d/echo?msg=hello", targetIP, targetPort),
},
},
},
SecurityContext: &v1.PodSecurityContext{
RunAsNonRoot: aws.Bool(true),
SeccompProfile: &v1.SeccompProfile{
Type: v1.SeccompProfileTypeRuntimeDefault,
},
},
RestartPolicy: v1.RestartPolicyNever,
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeName},
},
},
},
},
},
},
},
},
}
ct := pod.Spec.Containers[0]
framework.Logf("PodSpec Image=%v Command=%v Args=%v", ct.Image, ct.Command, ct.Args)

// Create the pod
_, err := cs.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create HTTP test pod: %v", err)
}
// Clean up the pod
defer func() {
err = cs.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{})
if err != nil {
framework.Logf("Failed to delete pod %s: %v", podName, err)
}
}()

gatherLogs := func(tail int) {
opts := &v1.PodLogOptions{}
if tail > 0 {
opts.TailLines = aws.Int64(int64(tail))
}
logs, errL := cs.CoreV1().Pods(namespace).GetLogs(podName, opts).DoRaw(context.TODO())
if errL != nil {
framework.Logf("Failed to retrieve pod logs when finished: %w", errL)
}
framework.Logf("HTTP test pod logs:\n%s", string(logs))
}

// Wait for the test pod to complete. Limit mux be higher than curl retries.
waitCount := 0
err = wait.PollImmediate(15*time.Second, 10*time.Minute, func() (bool, error) {
p, err := cs.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
framework.Logf("Error getting pod %s: %v", podName, err)
return false, err
}
framework.Logf("Pod %s status: Phase=%s", podName, p.Status.Phase)
podFinished := p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed

// frequently collect logs.
if waitCount > 0 && waitCount%4 == 0 {
gatherLogs(5)
}
if podFinished {
gatherLogs(0)
}
waitCount++
return podFinished, nil
})
// Check overall error
if err != nil {
return fmt.Errorf("error waiting for pod %s to complete: %v", podName, err)
}

// Inspect the pod's container status for exit code
pod, errS := cs.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if errS != nil {
return fmt.Errorf("failed to get pod %s: %v", podName, errS)
}
if len(pod.Status.ContainerStatuses) == 0 {
return fmt.Errorf("no container statuses found for pod %s", podName)
}
containerStatus := pod.Status.ContainerStatuses[0]

if containerStatus.State.Terminated != nil {
exitCode := containerStatus.State.Terminated.ExitCode
if exitCode != 0 {
if expectFailTest {
framework.Logf("WARNING: Test failed, but failure is explicitly allowed due to the 'ExpectTestFail' flag.")
} else {
return fmt.Errorf("pod %s exited with code %d", podName, exitCode)
}
}
}
return nil
}