Skip to content

Commit

Permalink
Add GlobalNet E2E test to update service port
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis authored and aswinsuryan committed Dec 11, 2023
1 parent 793f75e commit 54d6d18
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 82 deletions.
53 changes: 47 additions & 6 deletions test/e2e/dataplane/tcp_gn_pod_connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,35 @@ limitations under the License.
package dataplane

import (
"context"
"fmt"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/util"
"github.com/submariner-io/shipyard/test/e2e/framework"
"github.com/submariner-io/shipyard/test/e2e/tcp"
subFramework "github.com/submariner-io/submariner/test/e2e/framework"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

var _ = Describe("Basic TCP connectivity tests across overlapping clusters without discovery", Label(TestLabel, "globalnet"), func() {
f := framework.NewFramework("dataplane-gn-conn-nd")
var toEndpointType tcp.EndpointType
var networking framework.NetworkingType
var egressIPType subFramework.GlobalEgressIPType
var fromCluster framework.ClusterIndex
var toCluster framework.ClusterIndex

var (
toEndpointType tcp.EndpointType
networking framework.NetworkingType
egressIPType subFramework.GlobalEgressIPType
fromCluster framework.ClusterIndex
toCluster framework.ClusterIndex
runAdditionalTest subFramework.RunAdditionalTestFn
)

BeforeEach(func() {
runAdditionalTest = nil
})

verifyInteraction := func(fromClusterScheduling, toClusterScheduling framework.NetworkPodScheduling) {
It("should have sent the expected data from the pod to the other pod", func() {
Expand All @@ -53,7 +69,11 @@ var _ = Describe("Basic TCP connectivity tests across overlapping clusters witho
FromClusterScheduling: fromClusterScheduling,
ToCluster: toCluster,
ToClusterScheduling: toClusterScheduling,
}, subFramework.GetGlobalnetEgressParams(egressIPType))
}, subFramework.GlobalnetTestParams{
GlobalnetEnabled: framework.TestContext.GlobalnetEnabled,
GlobalEgressIP: egressIPType,
RunAdditionalTest: runAdditionalTest,
})
})
}

Expand All @@ -67,6 +87,27 @@ var _ = Describe("Basic TCP connectivity tests across overlapping clusters witho
})

When("the pod is not on a gateway and the remote service is not on a gateway", Label(framework.BasicTestLabel), func() {
BeforeEach(func() {
runAdditionalTest = func(lpConfig framework.NetworkPodConfig, cpConfig framework.NetworkPodConfig, service *v1.Service,
verifyConnectivity func(listener *framework.NetworkPod, connector *framework.NetworkPod),
) {
lpConfig.Port++
cpConfig.Port = lpConfig.Port

By(fmt.Sprintf("Updating service port to %d", lpConfig.Port))

err := util.Update(context.Background(), resource.ForService(framework.KubeClients[lpConfig.Cluster],
f.Namespace), service, func(existing *v1.Service) (*v1.Service, error) {
existing.Spec.Ports[0].Port = int32(lpConfig.Port)
existing.Spec.Ports[0].TargetPort = intstr.FromInt32(int32(lpConfig.Port))
return existing, nil
})
Expect(err).To(Succeed())

verifyConnectivity(f.NewNetworkPod(&lpConfig), f.NewNetworkPod(&cpConfig))
}
})

verifyInteraction(framework.NonGatewayNode, framework.NonGatewayNode)
})

Expand Down
161 changes: 85 additions & 76 deletions test/e2e/framework/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,24 @@ func (t GlobalEgressIPType) String() string {
return "unknown"
}

type RunAdditionalTestFn func(listenerPodConfig framework.NetworkPodConfig, connectorPodConfig framework.NetworkPodConfig,
service *v1.Service, verifyConnectivity func(listener *framework.NetworkPod, connector *framework.NetworkPod))

type GlobalnetTestParams struct {
GlobalnetEnabled bool
GlobalEgressIP GlobalEgressIPType
GlobalnetEnabled bool
GlobalEgressIP GlobalEgressIPType
RunAdditionalTest RunAdditionalTestFn
}

func VerifyDatapathConnectivity(p tcp.ConnectivityTestParams, gn GlobalnetTestParams) {
if gn.GlobalnetEnabled {
verifyGlobalnetDatapathConnectivity(p, gn.GlobalEgressIP)
verifyGlobalnetDatapathConnectivity(p, gn)
} else {
tcp.RunConnectivityTest(p)
}
}

func verifyGlobalnetDatapathConnectivity(p tcp.ConnectivityTestParams, egressIPType GlobalEgressIPType) {
func verifyGlobalnetDatapathConnectivity(p tcp.ConnectivityTestParams, gn GlobalnetTestParams) {
Expect(p.ToEndpointType).To(BeElementOf([]tcp.EndpointType{tcp.GlobalServiceIP, tcp.GlobalPodIP}))

if p.ConnectionTimeout == 0 {
Expand All @@ -85,16 +89,44 @@ func verifyGlobalnetDatapathConnectivity(p tcp.ConnectivityTestParams, egressIPT
p.ConnectionAttempts = framework.TestContext.ConnectionAttempts
}

var connectorPodGlobalIPs []string

switch gn.GlobalEgressIP {
case ClusterSelector:
connectorPodGlobalIPs = p.Framework.AwaitClusterGlobalEgressIPs(p.FromCluster, constants.ClusterGlobalEgressIPName)
case NameSpaceSelector:
geipObject, err := newGlobalEgressIPObj(p.Framework.Namespace, nil)
Expect(err).To(Succeed())

err = framework.CreateGlobalEgressIP(p.FromCluster, geipObject)
Expect(err).To(Succeed())

connectorPodGlobalIPs = framework.AwaitGlobalEgressIPs(p.FromCluster, geipObject.GetName(), p.Framework.Namespace)
case PodSelector:
podSelector := &metav1.LabelSelector{MatchLabels: map[string]string{"test-app": "custom"}}
geipObject, err := newGlobalEgressIPObj(p.Framework.Namespace, podSelector)
Expect(err).To(Succeed())

err = framework.CreateGlobalEgressIP(p.FromCluster, geipObject)
Expect(err).To(Succeed())

connectorPodGlobalIPs = framework.AwaitGlobalEgressIPs(p.FromCluster, geipObject.GetName(), p.Framework.Namespace)
}

Expect(connectorPodGlobalIPs).ToNot(BeEmpty())

By(fmt.Sprintf("Creating a listener pod in cluster %q, which will wait for a handshake over TCP",
framework.TestContext.ClusterIDs[p.ToCluster]))

listenerPod := p.Framework.NewNetworkPod(&framework.NetworkPodConfig{
listenerPodConfig := &framework.NetworkPodConfig{
Type: framework.ListenerPod,
Cluster: p.ToCluster,
Scheduling: p.ToClusterScheduling,
ConnectionTimeout: p.ConnectionTimeout,
ConnectionAttempts: p.ConnectionAttempts,
})
}

listenerPod := p.Framework.NewNetworkPod(listenerPodConfig)

By(fmt.Sprintf("Pointing a ClusterIP service to the listener pod in cluster %q",
framework.TestContext.ClusterIDs[p.ToCluster]))
Expand All @@ -115,92 +147,69 @@ func verifyGlobalnetDatapathConnectivity(p tcp.ConnectivityTestParams, egressIPT
By(fmt.Sprintf("Creating a connector pod in cluster %q, which will attempt the specific UUID handshake over TCP",
framework.TestContext.ClusterIDs[p.FromCluster]))

var connectorPodGlobalIPs []string

switch egressIPType {
case ClusterSelector:
connectorPodGlobalIPs = p.Framework.AwaitClusterGlobalEgressIPs(p.FromCluster, constants.ClusterGlobalEgressIPName)
case NameSpaceSelector:
geipObject, err := newGlobalEgressIPObj(listenerPod.Pod.Namespace, nil)
Expect(err).To(Succeed())

err = framework.CreateGlobalEgressIP(p.FromCluster, geipObject)
Expect(err).To(Succeed())

connectorPodGlobalIPs = framework.AwaitGlobalEgressIPs(p.FromCluster, geipObject.GetName(), listenerPod.Pod.Namespace)
case PodSelector:
podSelector := &metav1.LabelSelector{MatchLabels: map[string]string{"test-app": "custom"}}
geipObject, err := newGlobalEgressIPObj(listenerPod.Pod.Namespace, podSelector)
Expect(err).To(Succeed())

err = framework.CreateGlobalEgressIP(p.FromCluster, geipObject)
Expect(err).To(Succeed())

connectorPodGlobalIPs = framework.AwaitGlobalEgressIPs(p.FromCluster, geipObject.GetName(), listenerPod.Pod.Namespace)
}

Expect(connectorPodGlobalIPs).ToNot(BeEmpty())

connectorPod := p.Framework.NewNetworkPod(&framework.NetworkPodConfig{
connectorPodConfig := &framework.NetworkPodConfig{
Type: framework.CustomPod,
Cluster: p.FromCluster,
Scheduling: p.FromClusterScheduling,
Networking: p.Networking,
ContainerName: "connector-pod",
ImageName: framework.TestContext.NettestImageURL,
Command: []string{"sleep", "600"},
})

cmd := []string{"sh", "-c", "for j in $(seq 1 " + strconv.Itoa(int(connectorPod.Config.NumOfDataBufs)) + "); do echo" +
" [dataplane] connector says " + connectorPod.Config.Data + "; done" +
" | for i in $(seq " + strconv.Itoa(int(p.ConnectionAttempts)) + ");" +
" do if nc -v " + remoteIP + " " + strconv.Itoa(connectorPod.Config.Port) + " -w " + strconv.Itoa(int(p.ConnectionTimeout)) + ";" +
" then break; else sleep " + strconv.Itoa(int(p.ConnectionTimeout/2)) + "; fi; done"}

stdOut, _, err := execCmdInBash(p, cmd, connectorPod.Pod)
Expect(err).ToNot(HaveOccurred())
}

By(fmt.Sprintf("Connector pod is scheduled on node %q", connectorPod.Pod.Spec.NodeName))
verifyConnectivity := func(listener *framework.NetworkPod, connector *framework.NetworkPod) {
cmd := []string{"sh", "-c", "for j in $(seq 1 " + strconv.Itoa(int(connector.Config.NumOfDataBufs)) + "); do echo" +
" [dataplane] connector says " + connector.Config.Data + "; done" +
" | for i in $(seq " + strconv.Itoa(int(listener.Config.ConnectionAttempts)) + ");" +
" do if nc -v " + remoteIP + " " + strconv.Itoa(connector.Config.Port) + " -w " +
strconv.Itoa(int(listener.Config.ConnectionTimeout)) + ";" +
" then break; else sleep " + strconv.Itoa(int(listener.Config.ConnectionTimeout/2)) + "; fi; done"}

stdOut, _, err := p.Framework.ExecWithOptions(context.TODO(), &framework.ExecOptions{
Command: cmd,
Namespace: connector.Pod.Namespace,
PodName: connector.Pod.Name,
ContainerName: connector.Pod.Spec.Containers[0].Name,
Stdin: nil,
CaptureStdout: true,
CaptureStderr: true,
PreserveWhitespace: true,
}, connector.Config.Cluster)
Expect(err).ToNot(HaveOccurred())

By(fmt.Sprintf("Connector pod is scheduled on node %q", connector.Pod.Spec.NodeName))

By(fmt.Sprintf("Waiting for the listener pod %q on node %q to exit, returning what listener sent",
listener.Pod.Name, listener.Pod.Spec.NodeName))
listener.AwaitFinish()
listener.CheckSuccessfulFinish()
p.Framework.DeletePod(p.FromCluster, connector.Pod.Name, connector.Pod.Namespace)

By("Verifying that the listener got the connector's data and the connector got the listener's data")
Expect(listener.TerminationMessage).To(ContainSubstring(connector.Config.Data))
Expect(stdOut).To(ContainSubstring(listener.Config.Data))

By(fmt.Sprintf("Verifying the output of the listener pod contains a %s global IP %v of the connector Pod",
gn.GlobalEgressIP, connectorPodGlobalIPs))

matchIP := ContainSubstring(connectorPodGlobalIPs[0])
for i := 1; i < len(connectorPodGlobalIPs); i++ {
matchIP = Or(matchIP, ContainSubstring(connectorPodGlobalIPs[i]))
}

By(fmt.Sprintf("Waiting for the listener pod %q on node %q to exit, returning what listener sent",
listenerPod.Pod.Name, listenerPod.Pod.Spec.NodeName))
listenerPod.AwaitFinish()
listenerPod.CheckSuccessfulFinish()
p.Framework.DeletePod(p.FromCluster, connectorPod.Pod.Name, connectorPod.Pod.Namespace)
Expect(listener.TerminationMessage).To(matchIP)

By("Verifying that the listener got the connector's data and the connector got the listener's data")
Expect(listenerPod.TerminationMessage).To(ContainSubstring(connectorPod.Config.Data))
Expect(stdOut).To(ContainSubstring(listenerPod.Config.Data))
p.Framework.DeletePod(listener.Config.Cluster, listener.Pod.Name, listener.Pod.Namespace)
}

By(fmt.Sprintf("Verifying the output of the listener pod contains a %s global IP %v of the connector Pod",
egressIPType, connectorPodGlobalIPs))
verifyConnectivity(listenerPod, p.Framework.NewNetworkPod(connectorPodConfig))

matchIP := ContainSubstring(connectorPodGlobalIPs[0])
for i := 1; i < len(connectorPodGlobalIPs); i++ {
matchIP = Or(matchIP, ContainSubstring(connectorPodGlobalIPs[i]))
if gn.RunAdditionalTest != nil {
gn.RunAdditionalTest(*listenerPodConfig, *connectorPodConfig, service, verifyConnectivity)
}

Expect(listenerPod.TerminationMessage).To(matchIP)

p.Framework.DeleteServiceExport(p.ToCluster, service.Name)
p.Framework.DeleteService(p.ToCluster, service.Name)
p.Framework.DeletePod(p.ToCluster, listenerPod.Pod.Name, listenerPod.Pod.Namespace)
p.Framework.DeletePod(p.FromCluster, connectorPod.Pod.Name, connectorPod.Pod.Namespace)
}

func execCmdInBash(p tcp.ConnectivityTestParams, cmd []string, pod *v1.Pod) (string, string, error) {
execOptions := framework.ExecOptions{
Command: cmd,
Namespace: pod.Namespace,
PodName: pod.Name,
ContainerName: pod.Spec.Containers[0].Name,
Stdin: nil,
CaptureStdout: true,
CaptureStderr: true,
PreserveWhitespace: true,
}

return p.Framework.ExecWithOptions(context.TODO(), &execOptions, p.FromCluster)
}

func getGlobalIngressIP(p tcp.ConnectivityTestParams, service *v1.Service) string {
Expand Down

0 comments on commit 54d6d18

Please sign in to comment.