Skip to content

Commit

Permalink
Fix change subnet (#4088)
Browse files Browse the repository at this point in the history
* couting vip ovn eip  add and del



---------

Signed-off-by: bobz965 <[email protected]>
  • Loading branch information
bobz965 committed May 29, 2024
1 parent 5fe053e commit 55dbfbb
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 102 deletions.
3 changes: 3 additions & 0 deletions charts/kube-ovn/templates/kube-ovn-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,9 @@ spec:
- jsonPath: .status.ready
name: Ready
type: boolean
- jsonPath: .spec.externalSubnet
name: ExternalSubnet
type: string
schema:
openAPIV3Schema:
type: object
Expand Down
3 changes: 3 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,9 @@ spec:
- jsonPath: .status.ready
name: Ready
type: boolean
- jsonPath: .spec.externalSubnet
name: ExternalSubnet
type: string
schema:
openAPIV3Schema:
type: object
Expand Down
72 changes: 44 additions & 28 deletions pkg/controller/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,31 @@ func (c *Controller) enqueueUpdateIP(oldObj, newObj interface{}) {
c.updateSubnetStatusQueue.Add(as)
}
}
// ip can not change these specs below
if oldIP.Spec.Subnet != "" && newIP.Spec.Subnet != oldIP.Spec.Subnet {
klog.Errorf("ip %s subnet can not change", newIP.Name)
}
if oldIP.Spec.Namespace != "" && newIP.Spec.Namespace != oldIP.Spec.Namespace {
klog.Errorf("ip %s namespace can not change", newIP.Name)
}
if oldIP.Spec.PodName != "" && newIP.Spec.PodName != oldIP.Spec.PodName {
klog.Errorf("ip %s podName can not change", newIP.Name)
}
if oldIP.Spec.PodType != "" && newIP.Spec.PodType != oldIP.Spec.PodType {
klog.Errorf("ip %s podType can not change", newIP.Name)
}
if oldIP.Spec.MacAddress != "" && newIP.Spec.MacAddress != oldIP.Spec.MacAddress {
klog.Errorf("ip %s macAddress can not change", newIP.Name)
}
if oldIP.Spec.NodeName != "" && newIP.Spec.NodeName != oldIP.Spec.NodeName {
klog.Errorf("ip %s nodeName can not change", newIP.Name)
}
if oldIP.Spec.V4IPAddress != "" && newIP.Spec.V4IPAddress != oldIP.Spec.V4IPAddress {
klog.Errorf("ip %s v4IPAddress can not change", newIP.Name)
}
if oldIP.Spec.V6IPAddress != "" && newIP.Spec.V6IPAddress != oldIP.Spec.V6IPAddress {
klog.Errorf("ip %s v6IPAddress can not change", newIP.Name)
}
}

func (c *Controller) enqueueDelIP(obj interface{}) {
Expand Down Expand Up @@ -286,49 +311,40 @@ func (c *Controller) handleUpdateIP(key string) error {
klog.Errorf("failed to get subnet %s: %v", cachedIP.Spec.Subnet, err)
return err
}
cleanIPAM := true
if isOvnSubnet(subnet) {
portName := cachedIP.Name
port, err := c.OVNNbClient.GetLogicalSwitchPort(portName, true)
if err != nil {
klog.Errorf("failed to get logical switch port %s: %v", portName, err)
return err
}
if port != nil && len(port.Addresses) > 0 {
address := port.Addresses[0]
if strings.Contains(address, cachedIP.Spec.MacAddress) {
klog.Infof("delete ip cr lsp %s from switch %s", portName, subnet.Name)
if err := c.OVNNbClient.DeleteLogicalSwitchPort(portName); err != nil {
klog.Errorf("failed to delete ip cr lsp %s from switch %s: %v", portName, subnet.Name, err)
return err
}
klog.V(3).Infof("sync sg for deleted port %s", portName)
sgList, err := c.getPortSg(port)
if err != nil {
klog.Errorf("get port sg failed, %v", err)
return err
}
for _, sgName := range sgList {
if sgName != "" {
c.syncSgPortsQueue.Add(sgName)
}
if port != nil {
klog.Infof("delete ip cr lsp %s from switch %s", portName, subnet.Name)
if err := c.OVNNbClient.DeleteLogicalSwitchPort(portName); err != nil {
klog.Errorf("failed to delete ip cr lsp %s from switch %s: %v", portName, subnet.Name, err)
return err
}
klog.V(3).Infof("sync sg for deleted port %s", portName)
sgList, err := c.getPortSg(port)
if err != nil {
klog.Errorf("get port sg failed, %v", err)
return err
}
for _, sgName := range sgList {
if sgName != "" {
c.syncSgPortsQueue.Add(sgName)
}
} else {
// ip subnet changed in pod handle add or update pod process
klog.Infof("lsp %s ip changed, only delete old ip cr %s", portName, key)
cleanIPAM = false
}
}
}
if cleanIPAM {
podKey := fmt.Sprintf("%s/%s", cachedIP.Spec.Namespace, cachedIP.Spec.PodName)
klog.Infof("ip cr %s release ipam pod key %s from subnet %s", cachedIP.Name, podKey, cachedIP.Spec.Subnet)
c.ipam.ReleaseAddressByPod(podKey, cachedIP.Spec.Subnet)
}
podKey := fmt.Sprintf("%s/%s", cachedIP.Spec.Namespace, cachedIP.Spec.PodName)
klog.Infof("ip cr %s release ipam pod key %s from subnet %s", cachedIP.Name, podKey, cachedIP.Spec.Subnet)
c.ipam.ReleaseAddressByPod(podKey, cachedIP.Spec.Subnet)
if err = c.handleDelIPFinalizer(cachedIP); err != nil {
klog.Errorf("failed to handle del ip finalizer %v", err)
return err
}
c.updateSubnetStatusQueue.Add(cachedIP.Spec.Subnet)
}
return nil
}
Expand Down
39 changes: 0 additions & 39 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,39 +534,6 @@ func (c *Controller) getPodKubeovnNets(pod *v1.Pod) ([]*kubeovnNet, error) {
return podNets, nil
}

func (c *Controller) changeVMSubnet(vmName, namespace, providerName, subnetName string) error {
ipName := ovs.PodNameToPortName(vmName, namespace, providerName)
ipCR, err := c.ipsLister.Get(ipName)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
err := fmt.Errorf("failed to get ip CR %s: %v", ipName, err)
klog.Error(err)
return err
}
if ipCR.Spec.Subnet != subnetName {
key := fmt.Sprintf("%s/%s", namespace, vmName)
klog.Infof("release ipam for vm %s from old subnet %s", key, ipCR.Spec.Subnet)
c.ipam.ReleaseAddressByPod(key, ipCR.Spec.Subnet)
klog.Infof("gc logical switch port %s", key)
if err := c.OVNNbClient.DeleteLogicalSwitchPort(key); err != nil {
klog.Errorf("failed to delete lsp %s, %v", key, err)
return err
}
// old lsp has been deleted, delete old ip cr
if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), ipName, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", ipName, err)
return err
}
}
c.updateSubnetStatusQueue.Add(ipCR.Spec.Subnet)
// handleAddOrUpdatePod will create new lsp and new ip cr
}
return nil
}

func (c *Controller) handleAddOrUpdatePod(key string) (err error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand Down Expand Up @@ -679,15 +646,9 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca
delete(pod.Annotations, fmt.Sprintf(util.PodNicAnnotationTemplate, podNet.ProviderName))
}
pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] = "true"

if vmKey != "" {
pod.Annotations[fmt.Sprintf(util.VMAnnotationTemplate, podNet.ProviderName)] = vmName
if err := c.changeVMSubnet(vmName, namespace, podNet.ProviderName, subnet.Name); err != nil {
klog.Errorf("vm %s change subnet to %s failed: %v", vmKey, subnet.Name, err)
return nil, err
}
}

if err := util.ValidateNetworkBroadcast(podNet.Subnet.Spec.CIDRBlock, ipStr); err != nil {
klog.Errorf("validate pod %s/%s failed: %v", namespace, name, err)
c.recorder.Eventf(pod, v1.EventTypeWarning, "ValidatePodNetworkFailed", err.Error())
Expand Down
97 changes: 74 additions & 23 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func (c *Controller) handleSubnetFinalizer(subnet *kubeovnv1.Subnet) (bool, erro
return false, nil
}

func (c Controller) patchSubnetStatus(subnet *kubeovnv1.Subnet, reason, errStr string) {
func (c Controller) patchSubnetStatus(subnet *kubeovnv1.Subnet, reason, errStr string) error {
if errStr != "" {
subnet.Status.SetError(reason, errStr)
subnet.Status.NotValidated(reason, errStr)
Expand All @@ -557,13 +557,16 @@ func (c Controller) patchSubnetStatus(subnet *kubeovnv1.Subnet, reason, errStr s
}
}

if bytes, err := subnet.Status.Bytes(); err != nil {
bytes, err := subnet.Status.Bytes()
if err != nil {
klog.Error(err)
} else {
if _, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status"); err != nil {
klog.Error("patch subnet status failed", err)
}
return err
}
if _, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status"); err != nil {
klog.Errorf("failed to patch status for subnet %s, %v", subnet.Name, err)
return err
}
return nil
}

func (c *Controller) validateVpcBySubnet(subnet *kubeovnv1.Subnet) (*kubeovnv1.Vpc, error) {
Expand Down Expand Up @@ -621,15 +624,21 @@ func (c *Controller) checkSubnetConflict(subnet *kubeovnv1.Subnet) error {
if util.CIDROverlap(sub.Spec.CIDRBlock, subnet.Spec.CIDRBlock) {
err = fmt.Errorf("subnet %s cidr %s is conflict with subnet %s cidr %s", subnet.Name, subnet.Spec.CIDRBlock, sub.Name, sub.Spec.CIDRBlock)
klog.Error(err)
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error())
if err = c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error()); err != nil {
klog.Error(err)
return err
}
return err
}

if subnet.Spec.ExternalEgressGateway != "" && sub.Spec.ExternalEgressGateway != "" &&
subnet.Spec.PolicyRoutingTableID == sub.Spec.PolicyRoutingTableID {
err = fmt.Errorf("subnet %s policy routing table ID %d is conflict with subnet %s policy routing table ID %d", subnet.Name, subnet.Spec.PolicyRoutingTableID, sub.Name, sub.Spec.PolicyRoutingTableID)
klog.Error(err)
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error())
if err = c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error()); err != nil {
klog.Error(err)
return err
}
return err
}
}
Expand All @@ -645,7 +654,10 @@ func (c *Controller) checkSubnetConflict(subnet *kubeovnv1.Subnet) error {
if addr.Type == v1.NodeInternalIP && util.CIDRContainIP(subnet.Spec.CIDRBlock, addr.Address) {
err = fmt.Errorf("subnet %s cidr %s conflict with node %s address %s", subnet.Name, subnet.Spec.CIDRBlock, node.Name, addr.Address)
klog.Error(err)
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error())
if err = c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error()); err != nil {
klog.Error(err)
return err
}
return err
}
}
Expand Down Expand Up @@ -736,10 +748,16 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {

if err = util.ValidateSubnet(*subnet); err != nil {
klog.Errorf("failed to validate subnet %s, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error())
if err = c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error()); err != nil {
klog.Error(err)
return err
}
return err
}
if err = c.patchSubnetStatus(subnet, "ValidateLogicalSwitchSuccess", ""); err != nil {
klog.Error(err)
return err
}
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchSuccess", "")

if err := c.ipam.AddOrUpdateSubnet(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps); err != nil {
klog.Error(err)
Expand Down Expand Up @@ -768,7 +786,10 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {

if !isOvnSubnet(subnet) {
// subnet provider is not ovn, and vpc is empty, should not reconcile
c.patchSubnetStatus(subnet, "SetNonOvnSubnetSuccess", "")
if err = c.patchSubnetStatus(subnet, "SetNonOvnSubnetSuccess", ""); err != nil {
klog.Error(err)
return err
}

subnet.Status.EnsureStandardConditions()
klog.Infof("non ovn subnet %s is ready", subnet.Name)
Expand Down Expand Up @@ -848,7 +869,10 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}
if subnet.Spec.EnableLb != nil && *subnet.Spec.EnableLb {
if err := c.OVNNbClient.LogicalSwitchUpdateLoadBalancers(subnet.Name, ovsdb.MutateOperationInsert, lbs...); err != nil {
c.patchSubnetStatus(subnet, "AddLbToLogicalSwitchFailed", err.Error())
if err = c.patchSubnetStatus(subnet, "AddLbToLogicalSwitchFailed", err.Error()); err != nil {
klog.Error(err)
return err
}
klog.Error(err)
return err
}
Expand Down Expand Up @@ -877,25 +901,40 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {

if subnet.Spec.Private {
if err := c.OVNNbClient.SetLogicalSwitchPrivate(subnet.Name, subnet.Spec.CIDRBlock, c.config.NodeSwitchCIDR, subnet.Spec.AllowSubnets); err != nil {
c.patchSubnetStatus(subnet, "SetPrivateLogicalSwitchFailed", err.Error())
if err = c.patchSubnetStatus(subnet, "SetPrivateLogicalSwitchFailed", err.Error()); err != nil {
klog.Error(err)
return err
}
klog.Error(err)
return err
}

c.patchSubnetStatus(subnet, "SetPrivateLogicalSwitchSuccess", "")
if err = c.patchSubnetStatus(subnet, "SetPrivateLogicalSwitchSuccess", ""); err != nil {
klog.Error(err)
return err
}
} else {
// clear acl when direction is ""
if err = c.OVNNbClient.DeleteAcls(subnet.Name, logicalSwitchKey, "", nil); err != nil {
c.patchSubnetStatus(subnet, "ResetLogicalSwitchAclFailed", err.Error())
if err = c.patchSubnetStatus(subnet, "ResetLogicalSwitchAclFailed", err.Error()); err != nil {
klog.Error(err)
return err
}
klog.Error(err)
return err
}

c.patchSubnetStatus(subnet, "ResetLogicalSwitchAclSuccess", "")
if err = c.patchSubnetStatus(subnet, "ResetLogicalSwitchAclSuccess", ""); err != nil {
klog.Error(err)
return err
}
}

if err := c.OVNNbClient.UpdateLogicalSwitchACL(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.Acls, subnet.Spec.AllowEWTraffic); err != nil {
c.patchSubnetStatus(subnet, "SetLogicalSwitchAclsFailed", err.Error())
if err = c.patchSubnetStatus(subnet, "SetLogicalSwitchAclsFailed", err.Error()); err != nil {
klog.Error(err)
return err
}
klog.Error(err)
return err
}
Expand Down Expand Up @@ -1441,7 +1480,10 @@ func (c *Controller) reconcileDistributedSubnetRouteInDefaultVpc(subnet *kubeovn
return err
}
subnet.Status.ActivateGateway = ""
c.patchSubnetStatus(subnet, "ChangeToDistributedGw", "")
if err := c.patchSubnetStatus(subnet, "ChangeToDistributedGw", ""); err != nil {
klog.Error(err)
return err
}
}

nodes, err := c.nodesLister.List(labels.Everything())
Expand Down Expand Up @@ -1592,7 +1634,10 @@ func (c *Controller) reconcileDefaultCentralizedSubnetRouteInDefaultVpc(subnet *
if newActivateNode == "" {
klog.Warningf("all gateways of subnet %s are not ready", subnet.Name)
subnet.Status.ActivateGateway = newActivateNode
c.patchSubnetStatus(subnet, "NoActiveGatewayFound", fmt.Sprintf("subnet %s gws are not ready", subnet.Name))
if err := c.patchSubnetStatus(subnet, "NoActiveGatewayFound", fmt.Sprintf("subnet %s gws are not ready", subnet.Name)); err != nil {
klog.Error(err)
return err
}

return fmt.Errorf("subnet %s gws are not ready", subnet.Name)
}
Expand All @@ -1604,7 +1649,10 @@ func (c *Controller) reconcileDefaultCentralizedSubnetRouteInDefaultVpc(subnet *
return err
}
subnet.Status.ActivateGateway = newActivateNode
c.patchSubnetStatus(subnet, "ReconcileCentralizedGatewaySuccess", "")
if err := c.patchSubnetStatus(subnet, "ReconcileCentralizedGatewaySuccess", ""); err != nil {
klog.Error(err)
return err
}

klog.Infof("delete old distributed policy route for subnet %s", subnet.Name)
if err := c.deletePolicyRouteByGatewayType(subnet, kubeovnv1.GWDistributedType, false); err != nil {
Expand Down Expand Up @@ -1774,8 +1822,10 @@ func (c *Controller) reconcileOvnDefaultVpcRoute(subnet *kubeovnv1.Subnet) error
// centralized subnet
if subnet.Spec.GatewayNode == "" {
subnet.Status.NotReady("NoReadyGateway", "")
c.patchSubnetStatus(subnet, "NoReadyGateway", "")

if err := c.patchSubnetStatus(subnet, "NoReadyGateway", ""); err != nil {
klog.Error(err)
return err
}
err := fmt.Errorf("subnet %s Spec.GatewayNode field must be specified for centralized gateway type", subnet.Name)
klog.Error(err)
return err
Expand Down Expand Up @@ -1949,6 +1999,7 @@ func (c *Controller) reconcileU2OInterconnectionIP(subnet *kubeovnv1.Subnet) err
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
subnet.Status.U2OInterconnectionIP = ""
subnet.Status.U2OInterconnectionVPC = ""

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), u2oInterconnName, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
Expand Down
Loading

0 comments on commit 55dbfbb

Please sign in to comment.