Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Qiu <[email protected]>
  • Loading branch information
wenqiq committed Jan 31, 2024
1 parent ec65080 commit 0ab72ca
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 89 deletions.
68 changes: 36 additions & 32 deletions test/performance/framework/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"antrea.io/antrea/test/performance/framework/table"
)

type RunFunc func(ctx context.Context, ch chan time.Duration, data *ScaleData) error
type RunFunc func(ctx context.Context, ch chan time.Duration, data *ScaleData) ScaleResult

var cases = make(map[string]RunFunc, 128)

Expand Down Expand Up @@ -56,46 +56,22 @@ func (c *ScaleTestCase) Name() string {
return c.name
}

type ResponseTime struct {
max time.Duration
min time.Duration
avg time.Duration
type ScaleResult struct {
err error
actualCheckNum int
}

func (c *ScaleTestCase) Run(ctx context.Context, testData *ScaleData) error {
ctx = wrapScaleTestName(ctx, c.name)
done := make(chan interface{}, 1)
done := make(chan ScaleResult, 1)

startTime := time.Now()
caseName := ctx.Value(CtxScaleCaseName).(string)
testData.maxCheckNum = testData.nodesNum * 2
ress := make(chan time.Duration, testData.maxCheckNum)
res := "failed"
actualCheckNum := 0
defer func() {
var rows [][]string

var total, minRes, maxRes, avg time.Duration
count := 0
for i := 0; i < testData.maxCheckNum; i++ {
res := <-ress
total += res
count++

if count == 1 || res < minRes {
minRes = res
}

if res > maxRes {
maxRes = res
}
}

avg = total / time.Duration(count)

rows = append(rows, table.GenerateRow(caseName, res, time.Since(startTime).String(),
avg.String(), maxRes.String(), minRes.String()))
table.ShowResult(os.Stdout, rows)

close(ress)
close(done)
}()
Expand All @@ -107,13 +83,41 @@ func (c *ScaleTestCase) Run(ctx context.Context, testData *ScaleData) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
case scaleRes := <-done:
err := scaleRes.err
if err != nil {
return err.(error)
}
actualCheckNum = scaleRes.actualCheckNum
res = "success"
return nil
// return nil
}

var rows [][]string
var total, minRes, maxRes, avg time.Duration
count := 0
for i := 0; i < actualCheckNum; i++ {
res := <-ress
total += res
count++

if count == 1 || res < minRes {
minRes = res
}

if res > maxRes {
maxRes = res
}
}

if count != 0 {
avg = total / time.Duration(count)
}

rows = append(rows, table.GenerateRow(caseName, res, time.Since(startTime).String(),
avg.String(), maxRes.String(), minRes.String()))
table.ShowResult(os.Stdout, rows)
return nil
}

type CtxScaleCaseNameType string
Expand Down
26 changes: 19 additions & 7 deletions test/performance/framework/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package framework
import (
"context"
"fmt"
"k8s.io/klog/v2"
"time"

"antrea.io/antrea/test/performance/framework/networkpolicy"
Expand All @@ -26,11 +27,24 @@ func init() {
RegisterFunc("ScaleNetworkPolicy", ScaleNetworkPolicy)
}

func ScaleNetworkPolicy(ctx context.Context, ch chan time.Duration, data *ScaleData) error {
_, err := networkpolicy.ScaleUp(ctx, data.kubeconfig, data.kubernetesClientSet, data.namespaces,
func ScaleNetworkPolicy(ctx context.Context, ch chan time.Duration, data *ScaleData) (res ScaleResult) {
defer func() {
for {
if len(ch) == res.actualCheckNum {
break
}
klog.InfoS("Waiting the check goroutine finish")
time.Sleep(time.Second)
}
if err := networkpolicy.ScaleDown(ctx, data.namespaces, data.kubernetesClientSet); err != nil {
klog.ErrorS(err, "Scale down NetworkPolicies failed")
}
}()
checkCount, err := networkpolicy.ScaleUp(ctx, data.kubeconfig, data.kubernetesClientSet, data.namespaces,
data.Specification.NpNumPerNs, data.clientPods, data.Specification.IPv6, data.maxCheckNum, ch)
if err != nil {
return fmt.Errorf("scale up NetworkPolicies error: %v", err)
res.err = fmt.Errorf("scale up NetworkPolicies error: %v", err)
return
}

// maxNPCheckedCount := data.nodesNum
Expand Down Expand Up @@ -62,8 +76,6 @@ func ScaleNetworkPolicy(ctx context.Context, ch chan time.Duration, data *ScaleD
// }
// klog.InfoS("Checked networkPolicy", "Name", np.Name, "Namespace", np.Namespace, "count", i, "maxNum", maxNPCheckedCount)
// }
if err := networkpolicy.ScaleDown(ctx, data.namespaces, data.kubernetesClientSet); err != nil {
return err
}
return nil
res.actualCheckNum = checkCount
return
}
13 changes: 7 additions & 6 deletions test/performance/framework/networkpolicy/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package networkpolicy
import (
"context"
"fmt"
"k8s.io/client-go/rest"
"time"

"github.com/google/uuid"
Expand All @@ -27,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

"antrea.io/antrea/test/performance/utils"
Expand Down Expand Up @@ -94,14 +94,14 @@ type NetworkPolicyInfo struct {
toIP string
}

func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interface, nss []string, numPerNs int, clientPods []corev1.Pod, ipv6 bool, maxCheckNum int, ch chan time.Duration) (nps []NetworkPolicyInfo, err error) {
func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interface, nss []string, numPerNs int, clientPods []corev1.Pod, ipv6 bool, maxCheckNum int, ch chan time.Duration) (actualCheckNUm int, err error) {
// ScaleUp networkPolicies
start := time.Now()
checkCount := 0
for _, ns := range nss {
npsData, err := generateNetworkPolicies(ns, numPerNs)
if err != nil {
return nil, fmt.Errorf("error when generating network policies: %w", err)
return 0, fmt.Errorf("error when generating network policies: %w", err)
}
klog.InfoS("Scale up NetworkPolicies", "Num", len(npsData), "Namespace", ns)
for _, np := range npsData {
Expand All @@ -122,8 +122,10 @@ func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interfa
npInfo = NetworkPolicyInfo{Name: newNP.Name, Namespace: newNP.Namespace, Spec: newNP.Spec}
return nil
}); err != nil {
return nil, err
return 0, err
}

// sample num
if shouldCheck {
// Check connection of Pods in NetworkPolicies, workload Pods
fromPod, ip, err := SelectConnectPod(ctx, cs, npInfo.Namespace, &npInfo)
Expand All @@ -133,7 +135,6 @@ func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interfa
npInfo.fromPodName = fromPod.Name
npInfo.fromPodNamespace = fromPod.Namespace
npInfo.toIP = ip
nps = append(nps, npInfo)
checkCount++
go func() {
if err := utils.WaitUntil(ctx, ch, kubeConfig, cs, fromPod.Namespace, fromPod.Name, ip, false); err != nil {
Expand All @@ -155,7 +156,7 @@ func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interfa
}
}
}
klog.InfoS("Scale up NetworkPolicies", "Duration", time.Since(start), "count", len(nps))
klog.InfoS("Scale up NetworkPolicies", "Duration", time.Since(start), "actualCheckNum", checkCount)
return
}

Expand Down
48 changes: 29 additions & 19 deletions test/performance/framework/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@ package framework
import (
"context"
"fmt"
"k8s.io/klog/v2"
"time"

"antrea.io/antrea/test/performance/config"
"antrea.io/antrea/test/performance/utils"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"antrea.io/antrea/test/performance/config"
"antrea.io/antrea/test/performance/utils"
)

func init() {
Expand Down Expand Up @@ -98,14 +97,19 @@ func newWorkloadPod(podName, ns string, onRealNode bool, labelNum int) *corev1.P
return workloadPodTemplate(podName, ns, labels, onRealNode)
}

func ScaleUpWorkloadPods(ctx context.Context, ch chan time.Duration, data *ScaleData) error {
func ScaleUpWorkloadPods(ctx context.Context, ch chan time.Duration, data *ScaleData) (res ScaleResult) {
var err error
defer func() {
res.err = err
}()
if data.Specification.SkipDeployWorkload {
klog.V(2).InfoS("Skip creating workload Pods", "SkipDeployWorkload", data.Specification.SkipDeployWorkload)
return nil
return
}
// Creating workload Pods
start := time.Now()
podNum := data.Specification.PodsNumPerNs
count := 0
for _, ns := range data.namespaces {
gErr, _ := errgroup.WithContext(context.Background())
for i := 0; i < podNum; i++ {
Expand All @@ -126,12 +130,12 @@ func ScaleUpWorkloadPods(ctx context.Context, ch chan time.Duration, data *Scale
})
}
klog.V(2).InfoS("Create workload Pods", "PodNum", podNum, "Namespace", ns)
if err := gErr.Wait(); err != nil {
return err
if err = gErr.Wait(); err != nil {
return
}

// Waiting scale workload Pods to be ready
err := wait.PollUntil(config.WaitInterval, func() (bool, error) {
err = wait.PollUntil(config.WaitInterval, func() (bool, error) {
podsResult, err := data.kubernetesClientSet.
CoreV1().Pods(ns).
List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", AppLabelKey, AppLabelValue)})
Expand All @@ -150,17 +154,23 @@ func ScaleUpWorkloadPods(ctx context.Context, ch chan time.Duration, data *Scale
return false, nil
}, ctx.Done())
if err != nil {
return err
return
}
go func() {
select {
case ch <- time.Since(start):
klog.InfoS("Successfully write in channel")
default:
klog.InfoS("Skipped writing to the channel. No receiver.")
}
}()

if count < data.maxCheckNum {
ch <- time.Since(start)
count++
}
// go func() {
// select {
// case ch <- time.Since(start):
// klog.InfoS("Successfully write in channel")
// default:
// klog.InfoS("Skipped writing to the channel. No receiver.")
// }
// }()
}
res.actualCheckNum = count
klog.InfoS("Scaled up Pods", "Duration", time.Since(start), "count", podNum*len(data.namespaces))
return nil
return res
}
30 changes: 20 additions & 10 deletions test/performance/framework/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,17 @@ func init() {
RegisterFunc("RestartOVSContainer", RestartOVSContainer)
}

func ScaleRestartAgent(ctx context.Context, ch chan time.Duration, data *ScaleData) error {
err := data.kubernetesClientSet.CoreV1().Pods(metav1.NamespaceSystem).
func ScaleRestartAgent(ctx context.Context, ch chan time.Duration, data *ScaleData) (res ScaleResult) {
var err error
defer func() {
res.err = err
}()
err = data.kubernetesClientSet.CoreV1().Pods(metav1.NamespaceSystem).
DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "app=antrea,component=antrea-agent"})
if err != nil {
return err
return
}
return wait.PollImmediateUntil(config.WaitInterval, func() (bool, error) {
err = wait.PollImmediateUntil(config.WaitInterval, func() (bool, error) {
var ds *appv1.DaemonSet
if err := utils.DefaultRetry(func() error {
var err error
Expand All @@ -55,15 +59,20 @@ func ScaleRestartAgent(ctx context.Context, ch chan time.Duration, data *ScaleDa
"NumberAvailable", ds.Status.NumberAvailable)
return ds.Status.DesiredNumberScheduled == ds.Status.NumberAvailable, nil
}, ctx.Done())
return
}

func RestartController(ctx context.Context, ch chan time.Duration, data *ScaleData) error {
err := data.kubernetesClientSet.CoreV1().Pods(metav1.NamespaceSystem).
func RestartController(ctx context.Context, ch chan time.Duration, data *ScaleData) (res ScaleResult) {
var err error
defer func() {
res.err = err
}()
err = data.kubernetesClientSet.CoreV1().Pods(metav1.NamespaceSystem).
DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "app=antrea,component=antrea-controller"})
if err != nil {
return err
return
}
return wait.PollImmediateUntil(config.WaitInterval, func() (bool, error) {
err = wait.PollImmediateUntil(config.WaitInterval, func() (bool, error) {
var dp *appv1.Deployment
if err := utils.DefaultRetry(func() error {
var err error
Expand All @@ -72,10 +81,11 @@ func RestartController(ctx context.Context, ch chan time.Duration, data *ScaleDa
}); err != nil {
return false, err
}
return dp.Status.UnavailableReplicas == 0, nil
return dp.Status.ObservedGeneration == dp.Generation && dp.Status.ReadyReplicas == *dp.Spec.Replicas, nil
}, ctx.Done())
return
}

func RestartOVSContainer(ctx context.Context, ch chan time.Duration, data *ScaleData) error {
func RestartOVSContainer(ctx context.Context, ch chan time.Duration, data *ScaleData) ScaleResult {
return ScaleRestartAgent(ctx, ch, data)
}
Loading

0 comments on commit 0ab72ca

Please sign in to comment.