Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace lr,ls,lrp function call with ovnClient #2477

Merged
merged 5 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -868,17 +869,14 @@ func (c *Controller) startWorkers(ctx context.Context) {
go wait.Until(c.runAddSubnetWorker, time.Second, ctx.Done())
go wait.Until(c.runAddVlanWorker, time.Second, ctx.Done())
go wait.Until(c.runAddNamespaceWorker, time.Second, ctx.Done())
for {
klog.Infof("wait for %s and %s ready", c.config.DefaultLogicalSwitch, c.config.NodeSwitch)
time.Sleep(3 * time.Second)
lss, err := c.ovnLegacyClient.ListLogicalSwitch(c.config.EnableExternalVpc)
if err != nil {
util.LogFatalAndExit(err, "failed to list logical switch")
}

if util.IsStringIn(c.config.DefaultLogicalSwitch, lss) && util.IsStringIn(c.config.NodeSwitch, lss) && c.addNamespaceQueue.Len() == 0 {
break
}
err := wait.PollUntil(3*time.Second, func() (done bool, err error) {
subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
klog.Infof("wait for subnets %v ready", subnets)

return c.allSubnetReady(subnets...)
}, ctx.Done())
if err != nil {
klog.Fatalf("wait default/join subnet ready error: %v", err)
}

go wait.Until(c.runAddSgWorker, time.Second, ctx.Done())
Expand Down Expand Up @@ -1009,8 +1007,6 @@ func (c *Controller) startWorkers(ctx context.Context) {
go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
}

go wait.Until(c.syncVmLiveMigrationPort, 15*time.Second, ctx.Done())

go wait.Until(c.runAddVirtualIpWorker, time.Second, ctx.Done())
go wait.Until(c.runUpdateVirtualIpWorker, time.Second, ctx.Done())
go wait.Until(c.runDelVirtualIpWorker, time.Second, ctx.Done())
Expand Down Expand Up @@ -1040,3 +1036,18 @@ func (c *Controller) startWorkers(ctx context.Context) {
go wait.Until(c.runDelPodAnnotatedIptablesFipWorker, time.Second, ctx.Done())
}
}

func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
for _, lsName := range subnets {
exist, err := c.ovnClient.LogicalSwitchExists(lsName)
if err != nil {
return false, fmt.Errorf("check logical switch %s exist: %v", lsName, err)
}

if !exist {
return false, nil
}
}

return true, nil
}
85 changes: 85 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package controller

import (
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"k8s.io/client-go/util/workqueue"

mockovs "github.com/kubeovn/kube-ovn/mocks/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/client/clientset/versioned/fake"
informerfactory "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions/kubeovn/v1"
)

type fakeControllerInformers struct {
vpcInformer kubeovninformer.VpcInformer
sbunetInformer kubeovninformer.SubnetInformer
}

type fakeController struct {
fakeController *Controller
fakeinformers *fakeControllerInformers
mockOvnClient *mockovs.MockOvnClient
}

func alwaysReady() bool { return true }

func newFakeController(t *testing.T) *fakeController {
/* kube ovn fake client */
kubeovnClient := fake.NewSimpleClientset()
kubeovnInformerFactory := informerfactory.NewSharedInformerFactory(kubeovnClient, 0)
vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
sbunetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()

fakeInformers := &fakeControllerInformers{
vpcInformer: vpcInformer,
sbunetInformer: sbunetInformer,
}

/* ovn fake client */
mockOvnClient := mockovs.NewMockOvnClient(gomock.NewController(t))

ctrl := &Controller{
vpcsLister: vpcInformer.Lister(),
vpcSynced: alwaysReady,
subnetsLister: sbunetInformer.Lister(),
subnetSynced: alwaysReady,
ovnClient: mockOvnClient,
syncVirtualPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ""),
}

return &fakeController{
fakeController: ctrl,
fakeinformers: fakeInformers,
mockOvnClient: mockOvnClient,
}
}

func Test_allSubnetReady(t *testing.T) {
t.Parallel()

fakeController := newFakeController(t)
ctrl := fakeController.fakeController
mockOvnClient := fakeController.mockOvnClient

subnets := []string{"ovn-default", "join"}

t.Run("all subnet ready", func(t *testing.T) {
mockOvnClient.EXPECT().LogicalSwitchExists(gomock.Any()).Return(true, nil).Times(2)

ready, err := ctrl.allSubnetReady(subnets...)
require.NoError(t, err)
require.True(t, ready)
})

t.Run("some subnet are not ready", func(t *testing.T) {
mockOvnClient.EXPECT().LogicalSwitchExists(subnets[0]).Return(true, nil)
mockOvnClient.EXPECT().LogicalSwitchExists(subnets[1]).Return(false, nil)

ready, err := ctrl.allSubnetReady(subnets...)
require.NoError(t, err)
require.False(t, ready)
})
}
12 changes: 7 additions & 5 deletions pkg/controller/external-gw.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ func (c *Controller) removeExternalGateway() error {

if !keepExternalSubnet {
klog.Infof("delete external gateway switch %s", c.config.ExternalGatewaySwitch)
if err := c.ovnLegacyClient.DeleteGatewaySwitch(c.config.ExternalGatewaySwitch); err != nil {
klog.Errorf("failed to delete external gateway switch, %v", err)
if err := c.ovnClient.DeleteLogicalGatewaySwitch(util.ExternalGatewaySwitch, c.config.ClusterRouter); err != nil {
klog.Errorf("delete external gateway switch %s: %v", util.ExternalGatewaySwitch, err)
return err
}
} else {
klog.Infof("should keep provider network vlan underlay external gateway switch %s", c.config.ExternalGatewaySwitch)
lrpName := fmt.Sprintf("%s-%s", c.config.ClusterRouter, c.config.ExternalGatewaySwitch)
klog.Infof("delete logical router port %s", lrpName)
if err := c.ovnLegacyClient.DeleteLogicalRouterPort(lrpName); err != nil {
if err := c.ovnClient.DeleteLogicalRouterPort(lrpName); err != nil {
klog.Errorf("failed to delete lrp %s, %v", lrpName, err)
return err
}
Expand Down Expand Up @@ -160,10 +160,12 @@ func (c *Controller) establishExternalGateway(config map[string]string) error {
klog.Infof("lrp %s exist", lrpName)
return nil
}
if err := c.ovnLegacyClient.CreateGatewaySwitch(c.config.ExternalGatewaySwitch, c.config.ExternalGatewayNet, c.config.ExternalGatewayVlanID, lrpIp, lrpMac, chassises); err != nil {
klog.Errorf("failed to create external gateway switch, %v", err)

if err := c.ovnClient.CreateGatewayLogicalSwitch(c.config.ExternalGatewaySwitch, c.config.ClusterRouter, c.config.ExternalGatewayNet, lrpIp, lrpMac, c.config.ExternalGatewayVlanID, chassises...); err != nil {
klog.Errorf("create external gateway switch %s: %v", c.config.ExternalGatewaySwitch, err)
return err
}

return nil
}

Expand Down
60 changes: 34 additions & 26 deletions pkg/controller/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/ovsdb/ovnnb"
"github.com/kubeovn/kube-ovn/pkg/util"
)

Expand Down Expand Up @@ -52,26 +53,17 @@ func (c *Controller) gcLogicalRouterPort() error {
return err
}

var exceptPeerPorts []string
exceptPeerPorts := make(map[string]struct{})
for _, vpc := range vpcs {
for _, peer := range vpc.Status.VpcPeerings {
exceptPeerPorts = append(exceptPeerPorts, fmt.Sprintf("%s-%s", vpc.Name, peer))
exceptPeerPorts[fmt.Sprintf("%s-%s", vpc.Name, peer)] = struct{}{}
}
}
lrps, err := c.ovnLegacyClient.ListLogicalEntity("logical_router_port", "peer!=[]")
if err != nil {
klog.Errorf("failed to list logical router port, %v", err)

if err = c.ovnClient.DeleteLogicalRouterPorts(nil, logicalRouterPortFilter(exceptPeerPorts)); err != nil {
klog.Errorf("delete non-existent peer logical router port: %v", err)
return err
}
for _, lrp := range lrps {
if !util.ContainsString(exceptPeerPorts, lrp) {
klog.Infof("gc logical router port %s", lrp)
if err = c.ovnLegacyClient.DeleteLogicalRouterPort(lrp); err != nil {
klog.Errorf("failed to delete logical router port %s, %v", lrp, err)
return err
}
}
}
return nil
}

Expand Down Expand Up @@ -132,25 +124,27 @@ func (c *Controller) gcLogicalSwitch() error {
subnetMap[s.Name] = s
subnetNames = append(subnetNames, s.Name)
}
lss, err := c.ovnLegacyClient.ListLogicalSwitch(c.config.EnableExternalVpc)

lss, err := c.ovnClient.ListLogicalSwitch(c.config.EnableExternalVpc, nil)
if err != nil {
klog.Errorf("failed to list logical switch, %v", err)
klog.Errorf("list logical switch: %v", err)
return err
}

klog.Infof("ls in ovn %v", lss)
klog.Infof("subnet in kubernetes %v", subnetNames)
for _, ls := range lss {
if ls == util.InterconnectionSwitch ||
ls == util.ExternalGatewaySwitch ||
ls == c.config.ExternalGatewaySwitch {
if ls.Name == util.InterconnectionSwitch ||
ls.Name == util.ExternalGatewaySwitch ||
ls.Name == c.config.ExternalGatewaySwitch {
continue
}
if s := subnetMap[ls]; s != nil && isOvnSubnet(s) {
if s := subnetMap[ls.Name]; s != nil && isOvnSubnet(s) {
continue
}

klog.Infof("gc subnet %s", ls)
if err := c.handleDeleteLogicalSwitch(ls); err != nil {
if err := c.handleDeleteLogicalSwitch(ls.Name); err != nil {
klog.Errorf("failed to gc subnet %s, %v", ls, err)
return err
}
Expand Down Expand Up @@ -190,20 +184,23 @@ func (c *Controller) gcCustomLogicalRouter() error {
for _, s := range vpcs {
vpcNames = append(vpcNames, s.Name)
}
lrs, err := c.ovnLegacyClient.ListLogicalRouter(c.config.EnableExternalVpc)

lrs, err := c.ovnClient.ListLogicalRouter(c.config.EnableExternalVpc, nil)
if err != nil {
klog.Errorf("failed to list logical router, %v", err)
return err
}

klog.Infof("lr in ovn %v", lrs)
klog.Infof("vpc in kubernetes %v", vpcNames)

for _, lr := range lrs {
if lr == util.DefaultVpc {
if lr.Name == util.DefaultVpc {
continue
}
if !util.IsStringIn(lr, vpcNames) {
if !util.IsStringIn(lr.Name, vpcNames) {
klog.Infof("gc router %s", lr)
if err := c.deleteVpcRouter(lr); err != nil {
if err := c.deleteVpcRouter(lr.Name); err != nil {
klog.Errorf("failed to delete router %s, %v", lr, err)
return err
}
Expand Down Expand Up @@ -359,10 +356,11 @@ func (c *Controller) markAndCleanLSP() error {
}

klog.Infof("gc logical switch port %s", lsp.Name)
if err := c.ovnLegacyClient.DeleteLogicalSwitchPort(lsp.Name); err != nil {
if err := c.ovnClient.DeleteLogicalSwitchPort(lsp.Name); err != nil {
klog.Errorf("failed to delete lsp %s, %v", lsp, err)
return err
}

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), lsp.Name, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", lsp.Name, err)
Expand Down Expand Up @@ -842,3 +840,13 @@ func (c *Controller) gcVpcDns() error {
}
return nil
}

func logicalRouterPortFilter(exceptPeerPorts map[string]struct{}) func(lrp *ovnnb.LogicalRouterPort) bool {
return func(lrp *ovnnb.LogicalRouterPort) bool {
if _, ok := exceptPeerPorts[lrp.Name]; ok {
return false // ignore except lrp
}

return lrp.Peer != nil && len(*lrp.Peer) != 0
}
}
48 changes: 48 additions & 0 deletions pkg/controller/gc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package controller

import (
"fmt"
"testing"

"github.com/kubeovn/kube-ovn/pkg/ovsdb/ovnnb"
"github.com/stretchr/testify/require"
)

func newLogicalRouterPort(lrName, lrpName, mac string, networks []string) *ovnnb.LogicalRouterPort {
return &ovnnb.LogicalRouterPort{
Name: lrpName,
MAC: mac,
Networks: networks,
ExternalIDs: map[string]string{
"lr": lrName,
},
}
}

func Test_logicalRouterPortFilter(t *testing.T) {
t.Parallel()

exceptPeerPorts := map[string]struct{}{
"except-lrp-0": {},
"except-lrp-1": {},
}

lrpNames := []string{"other-0", "other-1", "other-2", "except-lrp-0", "except-lrp-1"}
lrps := make([]*ovnnb.LogicalRouterPort, 0)
for _, lrpName := range lrpNames {
lrp := newLogicalRouterPort("", lrpName, "", nil)
peer := fmt.Sprintf("%s-peer", lrpName)
lrp.Peer = &peer
lrps = append(lrps, lrp)
}

filterFunc := logicalRouterPortFilter(exceptPeerPorts)

for _, lrp := range lrps {
if _, ok := exceptPeerPorts[lrp.Name]; ok {
require.False(t, filterFunc(lrp))
} else {
require.True(t, filterFunc(lrp))
}
}
}
16 changes: 3 additions & 13 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,7 @@ func (c *Controller) initNodeSwitch() error {

// InitClusterRouter init cluster router to connect different logical switches
func (c *Controller) initClusterRouter() error {
lrs, err := c.ovnLegacyClient.ListLogicalRouter(c.config.EnableExternalVpc)
if err != nil {
return err
}
klog.Infof("exists routers: %v", lrs)
for _, r := range lrs {
if c.config.ClusterRouter == r {
return nil
}
}
return c.ovnLegacyClient.CreateLogicalRouter(c.config.ClusterRouter)
return c.ovnClient.CreateLogicalRouter(c.config.ClusterRouter)
}

func (c *Controller) initLB(name, protocol string, sessionAffinity bool) error {
Expand Down Expand Up @@ -798,8 +788,8 @@ func (c *Controller) initAppendLspExternalIds(portName string, pod *v1.Pod) erro
externalIDs["pod"] = fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
}

if err := c.ovnLegacyClient.SetLspExternalIds(portName, externalIDs); err != nil {
klog.Errorf("failed to set lsp external_ids for port %s: %v", portName, err)
if err := c.ovnClient.SetLogicalSwitchPortExternalIds(portName, externalIDs); err != nil {
klog.Errorf("set lsp external_ids for logical switch port %s: %v", portName, err)
return err
}

Expand Down
Loading