Skip to content

Commit

Permalink
test: decreased ha replicas to 2 + small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
fra98 authored and adamjensenbot committed Nov 18, 2024
1 parent b242938 commit f8312ec
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 29 deletions.
17 changes: 11 additions & 6 deletions pkg/gateway/concurrent/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package concurrent

import (
"context"
"fmt"
"net"

corev1 "k8s.io/api/core/v1"
Expand All @@ -31,16 +32,16 @@ var _ manager.Runnable = &RunnableGateway{}
type RunnableGateway struct {
Client client.Client

PodName string
DeploymentName string
Namespace string
PodName string
GatewayName string
Namespace string

Socket net.Listener
GuestConnections ipc.GuestConnections
}

// NewRunnableGateway creates a new Runnable.
func NewRunnableGateway(cl client.Client, podName, deploymentName, namespace string, containerNames []string) (*RunnableGateway, error) {
func NewRunnableGateway(cl client.Client, podName, gatewayName, namespace string, containerNames []string) (*RunnableGateway, error) {
guestConnections := ipc.NewGuestConnections(containerNames)

socket, err := ipc.CreateListenSocket(unixSocketPath)
Expand All @@ -56,7 +57,7 @@ func NewRunnableGateway(cl client.Client, podName, deploymentName, namespace str
return &RunnableGateway{
Client: cl,
PodName: podName,
DeploymentName: deploymentName,
GatewayName: gatewayName,
Namespace: namespace,
Socket: socket,
GuestConnections: guestConnections,
Expand All @@ -67,7 +68,7 @@ func NewRunnableGateway(cl client.Client, podName, deploymentName, namespace str
func (rg *RunnableGateway) Start(ctx context.Context) error {
defer rg.Close()

pods, err := ListAllGatewaysReplicas(ctx, rg.Client, rg.Namespace, rg.DeploymentName)
pods, err := ListAllGatewaysReplicas(ctx, rg.Client, rg.Namespace, rg.GatewayName)
if err != nil {
return err
}
Expand All @@ -84,6 +85,10 @@ func (rg *RunnableGateway) Start(ctx context.Context) error {
}
}

if activePod == nil {
return fmt.Errorf("active gateway pod not found")
}

if err := AddActiveGatewayLabel(ctx, rg.Client, client.ObjectKeyFromObject(activePod)); err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/gateway/concurrent/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ func RemoveActiveGatewayLabel(ctx context.Context, cl client.Client, key client.
return nil
}

// ListAllGatewaysReplicas returns the list of all the gateways replicas of the same deployment.
func ListAllGatewaysReplicas(ctx context.Context, cl client.Client, namespace, deploymentName string) ([]corev1.Pod, error) {
// ListAllGatewaysReplicas returns the list of all the gateways replicas of the same gateway.
func ListAllGatewaysReplicas(ctx context.Context, cl client.Client, namespace, gatewayName string) ([]corev1.Pod, error) {
podList := &corev1.PodList{}
if err := cl.List(ctx, podList, client.InNamespace(namespace), client.MatchingLabels{
consts.K8sAppNameKey: deploymentName,
consts.K8sAppNameKey: gatewayName,
}); err != nil {
return nil, err
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/utils/maps/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package maps
import (
"fmt"
"maps"
"sort"
"slices"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -179,16 +179,14 @@ func UpdateCache(annots, template map[string]string, cacheKey string) map[string
// SerializeMap convert a map in a string of concatenated keys seprated by commas.
func SerializeMap(m map[string]string) string {
serialized := ""
keys := make([]string, len(m))
i := 0
keys := make([]string, 0, len(m))
for k := range m {
keys[i] = k
i++
keys = append(keys, k)
}
sort.Strings(keys)
slices.Sort(keys)

for _, k := range keys {
serialized += fmt.Sprintf("%s,", m[k])
serialized += fmt.Sprintf("%s,", k)
}
return serialized
}
Expand Down
54 changes: 49 additions & 5 deletions test/e2e/cruise/network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (

const (
// clustersRequired is the number of clusters required in this E2E test.
clustersRequired = 2
clustersRequired = 3
// testName is the name of this E2E test.
testName = "NETWORK"
// StressMax is the maximum number of stress iterations.
Expand Down Expand Up @@ -126,16 +126,29 @@ var _ = Describe("Liqo E2E", func() {
When("\"liqoctl test network\" runs", func() {
It("should succeed both before and after gateway pods restart", func() {
// Run the tests.
Eventually(runLiqoctlNetworkTests(defaultArgs), timeout, interval).Should(Succeed())
Eventually(func() error {
return runLiqoctlNetworkTests(defaultArgs)
}, timeout, interval).Should(Succeed())

// Restart the gateway pods.
for i := range testContext.Clusters {
RestartPods(testContext.Clusters[i].ControllerClient)
}

// Check if there is only one active gateway pod per remote cluster.
for i := range testContext.Clusters {
numActiveGateway := testContext.Clusters[i].NumPeeredConsumers + testContext.Clusters[i].NumPeeredProviders
Eventually(func() error {
return checkUniqueActiveGatewayPod(testContext.Clusters[i].ControllerClient, numActiveGateway)
}, timeout, interval).Should(Succeed())
}

// Run the tests again.
Eventually(runLiqoctlNetworkTests(defaultArgs), timeout, interval).Should(Succeed())
Eventually(func() error {
return runLiqoctlNetworkTests(defaultArgs)
}, timeout, interval).Should(Succeed())
})

It("should succeed both before and after gateway pods restart (stress gateway deletion and run basic tests)", func() {
args := defaultArgs
args.basic = true
Expand All @@ -146,12 +159,22 @@ var _ = Describe("Liqo E2E", func() {
RestartPods(testContext.Clusters[j].ControllerClient)
}

// Check if there is only one active gateway pod per remote cluster.
for j := range testContext.Clusters {
numActiveGateway := testContext.Clusters[j].NumPeeredConsumers + testContext.Clusters[j].NumPeeredProviders
Eventually(func() error {
return checkUniqueActiveGatewayPod(testContext.Clusters[j].ControllerClient, numActiveGateway)
}, timeout, interval).Should(Succeed())
}

if i == stressMax-1 {
args.remove = true
}

// Run the tests.
Eventually(runLiqoctlNetworkTests(args), timeout, interval).Should(Succeed())
Eventually(func() error {
return runLiqoctlNetworkTests(args)
}, timeout, interval).Should(Succeed())
}
})
})
Expand Down Expand Up @@ -299,7 +322,6 @@ func RestartPods(cl client.Client) {
if err := cl.List(ctx, deploymentList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
gateway.GatewayComponentKey: gateway.GatewayComponentGateway,
concurrent.ActiveGatewayKey: concurrent.ActiveGatewayValue,
}),
}); err != nil {
return err
Expand All @@ -314,3 +336,25 @@ func RestartPods(cl client.Client) {
return nil
}, timeout, interval).Should(Succeed())
}

// checkUniqueActiveGatewayPod checks if there is only one active gateway pod.
func checkUniqueActiveGatewayPod(cl client.Client, numActiveGateway int) error {
// Sleep few seconds to be sure that the new leader is elected.
time.Sleep(2 * time.Second)

podList := &corev1.PodList{}
if err := cl.List(ctx, podList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
gateway.GatewayComponentKey: gateway.GatewayComponentGateway,
concurrent.ActiveGatewayKey: concurrent.ActiveGatewayValue,
}),
}); err != nil {
return fmt.Errorf("unable to list active gateway pods: %w", err)
}

if len(podList.Items) != numActiveGateway {
return fmt.Errorf("expected %d active gateway pod, got %d", numActiveGateway, len(podList.Items))
}

return nil
}
4 changes: 2 additions & 2 deletions test/e2e/pipeline/infra/cluster-api/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ WORKDIR=$(dirname "$FILEPATH")
source "$WORKDIR/../../utils.sh"

# shellcheck disable=SC1091
# shellcheck source=./cni.sh
source "$WORKDIR/cni.sh"
# shellcheck source=../cni.sh
source "$WORKDIR/../cni.sh"

export K8S_VERSION=${K8S_VERSION:-"1.29.7"}
K8S_VERSION=$(echo -n "$K8S_VERSION" | sed 's/v//g') # remove the leading v
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion test/e2e/pipeline/infra/kind/pre-requirements.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ install_kubectl "${OS}" "${ARCH}" "${K8S_VERSION}"

install_helm "${OS}" "${ARCH}"

KIND_VERSION="v0.23.0"
KIND_VERSION="v0.24.0"

echo "Downloading Kind ${KIND_VERSION}"

Expand Down
9 changes: 9 additions & 0 deletions test/e2e/pipeline/infra/kind/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ WORKDIR=$(dirname "$FILEPATH")
# shellcheck source=../../utils.sh
source "$WORKDIR/../../utils.sh"

# shellcheck disable=SC1091
# shellcheck source=../cni.sh
source "$WORKDIR/../cni.sh"

KIND="${BINDIR}/kind"

CLUSTER_NAME=cluster
Expand All @@ -59,6 +63,11 @@ do
echo "Creating cluster ${CLUSTER_NAME}${i}..."
${KIND} create cluster --name "${CLUSTER_NAME}${i}" --kubeconfig "${TMPDIR}/kubeconfigs/liqo_kubeconf_${i}" --config "${TMPDIR}/liqo-cluster-${CLUSTER_NAME}${i}.yaml" --wait 2m

# Install CNI if kindnet disabled
if [[ ${DISABLE_KINDNET} == "true" ]]; then
"install_${CNI}" "${TMPDIR}/kubeconfigs/liqo_kubeconf_${i}"
fi

# Install metrics-server
install_metrics_server "${TMPDIR}/kubeconfigs/liqo_kubeconf_${i}"
done
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/pipeline/installer/liqoctl/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ LIQO_VERSION="${LIQO_VERSION:-$(git rev-parse HEAD)}"
export SERVICE_CIDR=10.100.0.0/16
export POD_CIDR=10.200.0.0/16
export POD_CIDR_OVERLAPPING=${POD_CIDR_OVERLAPPING:-"false"}
export HA_REPLICAS=3
export HA_REPLICAS=2

for i in $(seq 1 "${CLUSTER_NUMBER}");
do
Expand Down
25 changes: 24 additions & 1 deletion test/e2e/postinstall/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

"github.com/liqotech/liqo/pkg/consts"
gwconsts "github.com/liqotech/liqo/pkg/gateway"
fcutils "github.com/liqotech/liqo/pkg/utils/foreigncluster"
"github.com/liqotech/liqo/pkg/vkMachinery"
"github.com/liqotech/liqo/test/e2e/testutils/tester"
"github.com/liqotech/liqo/test/e2e/testutils/util"
)
Expand Down Expand Up @@ -92,8 +95,28 @@ var _ = Describe("Liqo E2E", func() {
for _, tenantNs := range tenantNsList.Items {
Eventually(func() bool {
readyPods, notReadyPods, err := util.ArePodsUp(ctx, cluster.NativeClient, tenantNs.Name)
Expect(err).ToNot(HaveOccurred())
klog.Infof("Tenant pods status: %d ready, %d not ready", len(readyPods), len(notReadyPods))
return err == nil && len(notReadyPods) == 0 && len(readyPods) == util.NumPodsInTenantNs(true, cluster.Role)
// Get deployment gateway
gwDeployments, err := cluster.NativeClient.AppsV1().Deployments(tenantNs.Name).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", gwconsts.GatewayComponentKey, gwconsts.GatewayComponentGateway),
})
Expect(err).ToNot(HaveOccurred())
Expect(gwDeployments.Items).To(HaveLen(1))
gwReplicas := int(ptr.Deref(gwDeployments.Items[0].Spec.Replicas, 1))

// Get deployment virtual-kubelet if role is consumer
vkReplicas := 0
if fcutils.IsConsumer(cluster.Role) {
vkDeployments, err := cluster.NativeClient.AppsV1().Deployments(tenantNs.Name).List(ctx, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(vkMachinery.KubeletBaseLabels).String(),
})
Expect(err).ToNot(HaveOccurred())
Expect(vkDeployments.Items).To(HaveLen(1))
vkReplicas = int(ptr.Deref(vkDeployments.Items[0].Spec.Replicas, 1))
}
return len(notReadyPods) == 0 &&
len(readyPods) == util.NumPodsInTenantNs(true, cluster.Role, gwReplicas, vkReplicas)
}, timeout, interval).Should(BeTrue())
}
},
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/testutils/util/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ func ArePodsUp(ctx context.Context, clientset kubernetes.Interface, namespace st
}

// NumPodsInTenantNs returns the number of pods that should be present in a tenant namespace.
func NumPodsInTenantNs(networkingEnabled bool, role liqov1beta1.RoleType) int {
func NumPodsInTenantNs(networkingEnabled bool, role liqov1beta1.RoleType, gwReplicas, vkReplicas int) int {
count := 0
// If the network is enabled, it should have the gateway pod.
if networkingEnabled {
count += 3
count += gwReplicas
}
// If the cluster is a consumer, it should have the virtual-kubelet pod.
if fcutils.IsConsumer(role) {
count++
count += vkReplicas
}
return count
}
Expand Down

0 comments on commit f8312ec

Please sign in to comment.