Skip to content

Commit

Permalink
add concurrency limiter to ovs-vsctl (#3288)
Browse files Browse the repository at this point in the history
* add concurrency limiter to ovs-vsctl

* allow unlimited concurrency



---------

Signed-off-by: 夜微澜 <[email protected]>
  • Loading branch information
qiutingjun authored Oct 11, 2023
1 parent 286e634 commit b6192f3
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 5 deletions.
1 change: 1 addition & 0 deletions charts/templates/ovncni-ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ spec:
- --enable-metrics={{- .Values.networking.ENABLE_METRICS }}
- --kubelet-dir={{ .Values.kubelet_conf.KUBELET_DIR }}
- --enable-tproxy={{ .Values.func.ENABLE_TPROXY }}
- --ovs-vsctl-concurrency={{ .Values.performance.OVS_VSCTL_CONCURRENCY }}
securityContext:
runAsUser: 0
privileged: true
Expand Down
1 change: 1 addition & 0 deletions charts/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ performance:
RPMS: "openvswitch-kmod"
GC_INTERVAL: 360
INSPECT_INTERVAL: 20
OVS_VSCTL_CONCURRENCY: 100

debug:
ENABLE_MIRROR: false
Expand Down
3 changes: 3 additions & 0 deletions cmd/daemon/cniserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
"github.com/kubeovn/kube-ovn/pkg/daemon"
"github.com/kubeovn/kube-ovn/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/util"
"github.com/kubeovn/kube-ovn/versions"
)
Expand All @@ -37,6 +38,8 @@ func CmdMain() {
util.LogFatalAndExit(err, "failed to do the OS initialization")
}

ovs.UpdateOVSVsctlLimiter(config.OVSVsctlConcurrency)

nicBridgeMappings, err := daemon.InitOVSBridges()
if err != nil {
util.LogFatalAndExit(err, "failed to initialize OVS bridges")
Expand Down
2 changes: 2 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ IFACE=${IFACE:-}
DPDK_TUNNEL_IFACE=${DPDK_TUNNEL_IFACE:-br-phy}
ENABLE_BIND_LOCAL_IP=${ENABLE_BIND_LOCAL_IP:-true}
ENABLE_TPROXY=${ENABLE_TPROXY:-false}
OVS_VSCTL_CONCURRENCY=${OVS_VSCTL_CONCURRENCY:-100}

# debug
DEBUG_WRAPPER=${DEBUG_WRAPPER:-}
Expand Down Expand Up @@ -4069,6 +4070,7 @@ spec:
- --log_file_max_size=0
- --kubelet-dir=$KUBELET_DIR
- --enable-tproxy=$ENABLE_TPROXY
- --ovs-vsctl-concurrency=$OVS_VSCTL_CONCURRENCY
securityContext:
runAsUser: 0
privileged: true
Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Configuration struct {
TCPConnCheckPort int
UDPConnCheckPort int
EnableTProxy bool
OVSVsctlConcurrency int32
}

// ParseFlags will parse cmd args then init kubeClient and configuration
Expand Down Expand Up @@ -102,6 +103,7 @@ func ParseFlags() *Configuration {
argTCPConnectivityCheckPort = pflag.Int("tcp-conn-check-port", 8100, "TCP connectivity Check Port")
argUDPConnectivityCheckPort = pflag.Int("udp-conn-check-port", 8101, "UDP connectivity Check Port")
argEnableTProxy = pflag.Bool("enable-tproxy", false, "enable tproxy for vpc pod liveness or readiness probe")
argOVSVsctlConcurrency = pflag.Int32("ovs-vsctl-concurrency", 100, "concurrency limit of ovs-vsctl")
)

// mute info log for ipset lib
Expand Down Expand Up @@ -157,6 +159,7 @@ func ParseFlags() *Configuration {
TCPConnCheckPort: *argTCPConnectivityCheckPort,
UDPConnCheckPort: *argUDPConnectivityCheckPort,
EnableTProxy: *argEnableTProxy,
OVSVsctlConcurrency: *argOVSVsctlConcurrency,
}
return config
}
Expand Down
43 changes: 38 additions & 5 deletions pkg/ovs/ovs-vsctl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ovs

import (
"context"
"fmt"
"os/exec"
"regexp"
Expand All @@ -13,25 +14,57 @@ import (
"github.com/kubeovn/kube-ovn/pkg/util"
)

var limiter *Limiter

func init() {
limiter = new(Limiter)
}

func UpdateOVSVsctlLimiter(c int32) {
if c >= 0 {
limiter.Update(c)
klog.V(4).Infof("update ovs-vsctl concurrency limit to %d", limiter.Limit())
}
}

// Glory belongs to openvswitch/ovn-kubernetes
// https://github.com/openvswitch/ovn-kubernetes/blob/master/go-controller/pkg/util/ovs.go

var podNetNsRegexp = regexp.MustCompile(`pod_netns="([^"]+)"`)

func Exec(args ...string) (string, error) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

var (
start time.Time
elapsed float64
output []byte
method, code string
err error
)

if err = limiter.Wait(ctx); err != nil {
klog.V(4).Infof("command %s %s waiting for execution timeout by concurrency limit of %d", OvsVsCtl, strings.Join(args, " "), limiter.Limit())
return "", err
}
defer limiter.Done()
klog.V(4).Infof("command %s %s waiting for execution concurrency %d/%d", OvsVsCtl, strings.Join(args, " "), limiter.Current(), limiter.Limit())

start = time.Now()
args = append([]string{"--timeout=30"}, args...)
output, err := exec.Command(OvsVsCtl, args...).CombinedOutput()
elapsed := float64((time.Since(start)) / time.Millisecond)
output, err = exec.Command(OvsVsCtl, args...).CombinedOutput()
elapsed = float64((time.Since(start)) / time.Millisecond)
klog.V(4).Infof("command %s %s in %vms", OvsVsCtl, strings.Join(args, " "), elapsed)
method := ""

for _, arg := range args {
if !strings.HasPrefix(arg, "--") {
method = arg
break
}
}
code := "0"

code = "0"
defer func() {
ovsClientRequestLatency.WithLabelValues("ovsdb", method, code).Observe(elapsed)
}()
Expand Down
44 changes: 44 additions & 0 deletions pkg/ovs/util.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package ovs

import (
"context"
"fmt"
"regexp"
"strings"
"sync/atomic"
"time"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
Expand Down Expand Up @@ -235,3 +238,44 @@ func (m aclMatch) String() string {
rule, _ := m.Match()
return rule
}

type Limiter struct {
limit int32
current int32
}

func (l *Limiter) Limit() int32 {
return l.limit
}

func (l *Limiter) Current() int32 {
return atomic.LoadInt32(&l.current)
}

func (l *Limiter) Update(limit int32) {
l.limit = limit
}

func (l *Limiter) Wait(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled by timeout")
default:
if l.limit == 0 {
atomic.AddInt32(&l.current, 1)
return nil
}

if atomic.LoadInt32(&l.current) < l.limit {
atomic.AddInt32(&l.current, 1)
return nil
}
time.Sleep(10 * time.Millisecond)
}
}
}

func (l *Limiter) Done() {
atomic.AddInt32(&l.current, -1)
}
91 changes: 91 additions & 0 deletions pkg/ovs/util_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package ovs

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -229,3 +231,92 @@ func Test_OrAclMatch_Match(t *testing.T) {
require.ErrorContains(t, err, "acl rule key is required")
})
}

func Test_Limiter(t *testing.T) {
t.Parallel()

t.Run("without limit", func(t *testing.T) {
t.Parallel()

var (
limiter *Limiter
err error
)

limiter = new(Limiter)

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(1), limiter.Current())

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(2), limiter.Current())

limiter.Done()
require.Equal(t, int32(1), limiter.Current())

limiter.Done()
require.Equal(t, int32(0), limiter.Current())
})

t.Run("with limit", func(t *testing.T) {
t.Parallel()

var (
limiter *Limiter
err error
)

limiter = new(Limiter)
limiter.Update(2)

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(1), limiter.Current())

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(2), limiter.Current())

time.AfterFunc(10*time.Second, func() {
limiter.Done()
require.Equal(t, int32(1), limiter.Current())
})

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(2), limiter.Current())
})

t.Run("with timeout", func(t *testing.T) {
t.Parallel()

var (
limiter *Limiter
err error
)

limiter = new(Limiter)
limiter.Update(2)

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(1), limiter.Current())

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(2), limiter.Current())

time.AfterFunc(10*time.Second, func() {
limiter.Done()
})

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

err = limiter.Wait(ctx)
require.ErrorContains(t, err, "context canceled by timeout")
require.Equal(t, int32(2), limiter.Current())
})
}
1 change: 1 addition & 0 deletions yamls/kube-ovn-dual-stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ spec:
- --log_file=/var/log/kube-ovn/kube-ovn-cni.log
- --log_file_max_size=0
- --enable-tproxy=false
- --ovs-vsctl-concurrency=100
securityContext:
runAsUser: 0
privileged: true
Expand Down
1 change: 1 addition & 0 deletions yamls/kube-ovn-ipv6.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ spec:
- --log_file=/var/log/kube-ovn/kube-ovn-cni.log
- --log_file_max_size=0
- --enable-tproxy=false
- --ovs-vsctl-concurrency=100
securityContext:
runAsUser: 0
privileged: true
Expand Down

0 comments on commit b6192f3

Please sign in to comment.