Skip to content

Commit

Permalink
refactor codes
Browse files Browse the repository at this point in the history
Signed-off-by: clyi <[email protected]>
  • Loading branch information
changluyi committed Sep 10, 2024
1 parent ba16cfa commit b8ed123
Showing 1 changed file with 200 additions and 136 deletions.
336 changes: 200 additions & 136 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
return err
}

isMcastQuerierChanged, err := c.reconcileCustomIPs(subnet)
_, isMcastQuerierChanged, err := c.reconcileSubnetSpecialIPs(subnet)
if err != nil {
klog.Errorf("failed to reconcile subnet %s Custom IPs %v", subnet.Name, err)
return err
Expand Down Expand Up @@ -850,40 +850,9 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}

if isMcastQuerierChanged {
if subnet.Spec.EnableMulticastSnoop {
multicastSnoopFlag := map[string]string{"mcast_snoop": "true", "mcast_querier": "true", "mcast_ip4_src": subnet.Status.McastQuerierIP, "mcast_eth_src": subnet.Status.McastQuerierMAC}
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
if err := c.OVNNbClient.CreateLogicalSwitchPort(subnet.Name, mcastQuerierLspName, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC, mcastQuerierLspName, "default", false, "", "", false, nil, ""); err != nil {
err = fmt.Errorf("failed to create mcast querier lsp %s: %w", mcastQuerierLspName, err)
klog.Error(err)
return err
}

if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationInsert, multicastSnoopFlag); err != nil {
klog.Errorf("enable logical switch multicast snoop %s: %v", subnet.Name, err)
return err
}
} else {
lss, err := c.OVNNbClient.ListLogicalSwitch(false, func(ls *ovnnb.LogicalSwitch) bool {
return ls.Name == subnet.Name
})
if err != nil || len(lss) == 0 {
klog.Errorf("failed to list logical switch %s: %v", subnet.Name, err)
return err
}

multicastSnoopFlag := map[string]string{"mcast_snoop": lss[0].OtherConfig["mcast_snoop"], "mcast_querier": lss[0].OtherConfig["mcast_querier"], "mcast_ip4_src": lss[0].OtherConfig["mcast_ip4_src"], "mcast_eth_src": lss[0].OtherConfig["mcast_eth_src"]}
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationDelete, multicastSnoopFlag); err != nil {
klog.Errorf("disable logical switch multicast snoop %s: %v", subnet.Name, err)
return err
}

if err := c.OVNNbClient.DeleteLogicalSwitchPort(mcastQuerierLspName); err != nil {
err = fmt.Errorf("failed to delete mcast querier lsp %s: %w", mcastQuerierLspName, err)
klog.Error(err)
return err
}
if err := c.handleMcastQuerierChange(subnet); err != nil {
klog.Errorf("failed to handle mcast querier IP change for subnet %s: %v", subnet.Name, err)
return err
}
}

Expand Down Expand Up @@ -1968,139 +1937,185 @@ func (c *Controller) reconcileVlan(subnet *kubeovnv1.Subnet) error {
return nil
}

func (c *Controller) reconcileCustomIPs(subnet *kubeovnv1.Subnet) (bool, error) {
needCalcIP := false
isMcastQuerierChanged := false
func (c *Controller) reconcileSubnetSpecialIPs(subnet *kubeovnv1.Subnet) (bool, bool, error) {
isU2OIPChanged := false
isMcastQuerierIPChanged := false
var err error

// reconcile u2o IP
if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway {
isU2OIPChanged := false
u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name)
u2oInterconnLrpName := fmt.Sprintf("%s-%s", subnet.Spec.Vpc, subnet.Name)
var v4ip, v6ip string
if subnet.Spec.U2OInterconnection {
u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name)
u2oInterconnLrpName := fmt.Sprintf("%s-%s", subnet.Spec.Vpc, subnet.Name)
var v4ip, v6ip, mac string
var err error
if subnet.Spec.U2OInterconnectionIP == "" && (subnet.Status.U2OInterconnectionIP == "" || subnet.Status.U2OInterconnectionMAC == "") {
v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName)
if err != nil {
klog.Errorf("failed to acquire underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err)
return isMcastQuerierChanged, err
}
} else if subnet.Spec.U2OInterconnectionIP != "" && subnet.Status.U2OInterconnectionIP != subnet.Spec.U2OInterconnectionIP {
if subnet.Status.U2OInterconnectionIP != "" {
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
}

v4ip, v6ip, mac, err = c.acquireStaticIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName, subnet.Spec.U2OInterconnectionIP)
if err != nil {
klog.Errorf("failed to acquire static underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err)
return isMcastQuerierChanged, err
}
v4ip, v6ip, _, err = c.acquireU2OIPAddress(subnet, u2oInterconnName, u2oInterconnLrpName)
if err != nil {
return isU2OIPChanged, isMcastQuerierIPChanged, err
}

if v4ip != "" || v6ip != "" {
switch subnet.Spec.Protocol {
case kubeovnv1.ProtocolIPv4:
subnet.Status.U2OInterconnectionIP = v4ip
case kubeovnv1.ProtocolIPv6:
subnet.Status.U2OInterconnectionIP = v6ip
case kubeovnv1.ProtocolDual:
subnet.Status.U2OInterconnectionIP = fmt.Sprintf("%s,%s", v4ip, v6ip)
}
if err := c.createOrUpdateIPCR("", u2oInterconnName, subnet.Status.U2OInterconnectionIP, mac, subnet.Name, "default", "", ""); err != nil {
klog.Errorf("failed to create or update IPs of %s : %v", u2oInterconnLrpName, err)
return isMcastQuerierChanged, err
}

subnet.Status.U2OInterconnectionMAC = mac
needCalcIP = true
isU2OIPChanged = true
}
} else if subnet.Status.U2OInterconnectionIP != "" {
u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name)
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
subnet.Status.U2OInterconnectionIP = ""
subnet.Status.U2OInterconnectionMAC = ""
subnet.Status.U2OInterconnectionVPC = ""

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), u2oInterconnName, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", u2oInterconnName, err)
return isMcastQuerierChanged, err
}
err = c.releaseU2OIPAddress(subnet, u2oInterconnName)
if err != nil {
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
needCalcIP = true
isU2OIPChanged = true
}

if needCalcIP {
if isU2OIPChanged {
klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s",
subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP)
}
}

// reconcile mcast querier IP
if subnet.Spec.EnableMulticastSnoop {
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
var v4ip, v6ip, mac string
var err error
if subnet.Status.McastQuerierIP == "" || subnet.Status.McastQuerierMAC == "" {
v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, mcastQuerierLspName, mcastQuerierLspName)
if err != nil {
klog.Errorf("failed to acquire mcast querier ip address for subnet %s, %v", subnet.Name, err)
return isMcastQuerierChanged, err
}
}
if v4ip != "" || v6ip != "" {
switch subnet.Spec.Protocol {
case kubeovnv1.ProtocolIPv4:
subnet.Status.McastQuerierIP = v4ip
case kubeovnv1.ProtocolIPv6:
subnet.Status.McastQuerierIP = v6ip
case kubeovnv1.ProtocolDual:
subnet.Status.McastQuerierIP = fmt.Sprintf("%s,%s", v4ip, v6ip)
}
if err := c.createOrUpdateIPCR("", mcastQuerierLspName, subnet.Status.McastQuerierIP, mac, subnet.Name, "default", "", ""); err != nil {
klog.Errorf("failed to create or update IPs of %s : %v", mcastQuerierLspName, err)
return isMcastQuerierChanged, err
}

subnet.Status.McastQuerierMAC = mac
needCalcIP = true
isMcastQuerierChanged = true
klog.Infof("reconcile subnet %s mcast querier IP %s mac %s",
subnet.Name, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC)
isMcastQuerierIPChanged, err = c.acquireMcastQuerierIP(subnet)
if err != nil {
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
} else {
if subnet.Status.McastQuerierIP != "" {
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
klog.Infof("release mcast querier ip address %s for subnet %s", subnet.Status.McastQuerierIP, subnet.Name)
c.ipam.ReleaseAddressByPod(mcastQuerierLspName, subnet.Name)
subnet.Status.McastQuerierIP = ""
subnet.Status.McastQuerierMAC = ""

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), mcastQuerierLspName, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", mcastQuerierLspName, err)
return isMcastQuerierChanged, err
}
}
needCalcIP = true
isMcastQuerierChanged = true
klog.Infof("reconcile subnet %s mcast querier IP %s mac %s",
subnet.Name, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC)
isMcastQuerierIPChanged, err = c.releaseMcastQuerierIP(subnet)
if err != nil {
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
}

if needCalcIP {
// caculate subnet status
if isU2OIPChanged || isMcastQuerierIPChanged {
if subnet.Spec.Protocol == kubeovnv1.ProtocolDual {
if _, err := c.calcDualSubnetStatusIP(subnet); err != nil {
klog.Error(err)
return isMcastQuerierChanged, err
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
} else {
if _, err := c.calcSubnetStatusIP(subnet); err != nil {
klog.Error(err)
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
}
}

return isU2OIPChanged, isMcastQuerierIPChanged, nil
}

func (c *Controller) acquireU2OIPAddress(subnet *kubeovnv1.Subnet, u2oInterconnName, u2oInterconnLrpName string) (string, string, string, error) {
var v4ip, v6ip, mac string
var err error

if subnet.Spec.U2OInterconnectionIP == "" && (subnet.Status.U2OInterconnectionIP == "" || subnet.Status.U2OInterconnectionMAC == "") {
v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName)
if err != nil {
klog.Errorf("failed to acquire underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err)
return "", "", "", err
}
} else if subnet.Spec.U2OInterconnectionIP != "" && subnet.Status.U2OInterconnectionIP != subnet.Spec.U2OInterconnectionIP {
if subnet.Status.U2OInterconnectionIP != "" {
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
}

v4ip, v6ip, mac, err = c.acquireStaticIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName, subnet.Spec.U2OInterconnectionIP)
if err != nil {
klog.Errorf("failed to acquire static underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err)
return "", "", "", err
}
}

switch subnet.Spec.Protocol {
case kubeovnv1.ProtocolIPv4:
subnet.Status.U2OInterconnectionIP = v4ip
case kubeovnv1.ProtocolIPv6:
subnet.Status.U2OInterconnectionIP = v6ip
case kubeovnv1.ProtocolDual:
subnet.Status.U2OInterconnectionIP = fmt.Sprintf("%s,%s", v4ip, v6ip)
}

err = c.createOrUpdateIPCR("", u2oInterconnName, subnet.Status.U2OInterconnectionIP, mac, subnet.Name, "default", "", "")
if err != nil {
klog.Errorf("failed to create or update IPs of %s : %v", u2oInterconnLrpName, err)
return "", "", "", err
}
subnet.Status.U2OInterconnectionMAC = mac
return v4ip, v6ip, mac, nil
}

func (c *Controller) releaseU2OIPAddress(subnet *kubeovnv1.Subnet, u2oInterconnName string) error {
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
subnet.Status.U2OInterconnectionIP = ""
subnet.Status.U2OInterconnectionMAC = ""
subnet.Status.U2OInterconnectionVPC = ""

err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), u2oInterconnName, metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", u2oInterconnName, err)
return err
}

return nil
}

func (c *Controller) acquireMcastQuerierIP(subnet *kubeovnv1.Subnet) (bool, error) {
isMcastQuerierChanged := false
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
var v4ip, v6ip, mac string
var err error

if subnet.Status.McastQuerierIP == "" || subnet.Status.McastQuerierMAC == "" {
v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, mcastQuerierLspName, mcastQuerierLspName)
if err != nil {
klog.Errorf("failed to acquire mcast querier ip address for subnet %s, %v", subnet.Name, err)
return isMcastQuerierChanged, err
}
}

if v4ip != "" || v6ip != "" {
switch subnet.Spec.Protocol {
case kubeovnv1.ProtocolIPv4:
subnet.Status.McastQuerierIP = v4ip
case kubeovnv1.ProtocolIPv6:
subnet.Status.McastQuerierIP = v6ip
case kubeovnv1.ProtocolDual:
subnet.Status.McastQuerierIP = fmt.Sprintf("%s,%s", v4ip, v6ip)
}

err := c.createOrUpdateIPCR("", mcastQuerierLspName, subnet.Status.McastQuerierIP, mac, subnet.Name, "default", "", "")
if err != nil {
klog.Errorf("failed to create or update IPs of %s : %v", mcastQuerierLspName, err)
return isMcastQuerierChanged, err
}

subnet.Status.McastQuerierMAC = mac
klog.Infof("reconcile subnet %s mcast querier IP %s mac %s",
subnet.Name, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC)
isMcastQuerierChanged = true
}

return isMcastQuerierChanged, nil
}

func (c *Controller) releaseMcastQuerierIP(subnet *kubeovnv1.Subnet) (bool, error) {
isMcastQuerierChanged := false
if subnet.Status.McastQuerierIP != "" {
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
klog.Infof("release mcast querier ip address %s for subnet %s", subnet.Status.McastQuerierIP, subnet.Name)
c.ipam.ReleaseAddressByPod(mcastQuerierLspName, subnet.Name)
subnet.Status.McastQuerierIP = ""
subnet.Status.McastQuerierMAC = ""

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), mcastQuerierLspName, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", mcastQuerierLspName, err)
return isMcastQuerierChanged, err
}
}
isMcastQuerierChanged = true
klog.Infof("reconcile subnet %s mcast querier IP %s mac %s",
subnet.Name, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC)
}
return isMcastQuerierChanged, nil
}
Expand Down Expand Up @@ -3344,3 +3359,52 @@ func (c *Controller) findSubnetByNetworkAttachmentDefinition(ns, name string, su

return subnet, nil
}

func (c *Controller) handleMcastQuerierChange(subnet *kubeovnv1.Subnet) error {
if subnet.Spec.EnableMulticastSnoop {
multicastSnoopFlag := map[string]string{
"mcast_snoop": "true",
"mcast_querier": "true",
"mcast_ip4_src": subnet.Status.McastQuerierIP,
"mcast_eth_src": subnet.Status.McastQuerierMAC,
}
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
if err := c.OVNNbClient.CreateLogicalSwitchPort(subnet.Name, mcastQuerierLspName, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC, mcastQuerierLspName, "default", false, "", "", false, nil, ""); err != nil {
err = fmt.Errorf("failed to create mcast querier lsp %s: %w", mcastQuerierLspName, err)
klog.Error(err)
return err
}

if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationInsert, multicastSnoopFlag); err != nil {
klog.Errorf("enable logical switch multicast snoop %s: %v", subnet.Name, err)
return err
}
} else {
lss, err := c.OVNNbClient.ListLogicalSwitch(false, func(ls *ovnnb.LogicalSwitch) bool {
return ls.Name == subnet.Name
})
if err != nil || len(lss) == 0 {
klog.Errorf("failed to list logical switch %s: %v", subnet.Name, err)
return err
}

multicastSnoopFlag := map[string]string{
"mcast_snoop": lss[0].OtherConfig["mcast_snoop"],
"mcast_querier": lss[0].OtherConfig["mcast_querier"],
"mcast_ip4_src": lss[0].OtherConfig["mcast_ip4_src"],
"mcast_eth_src": lss[0].OtherConfig["mcast_eth_src"],
}
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationDelete, multicastSnoopFlag); err != nil {
klog.Errorf("disable logical switch multicast snoop %s: %v", subnet.Name, err)
return err
}

if err := c.OVNNbClient.DeleteLogicalSwitchPort(mcastQuerierLspName); err != nil {
err = fmt.Errorf("failed to delete mcast querier lsp %s: %w", mcastQuerierLspName, err)
klog.Error(err)
return err
}
}
return nil
}

0 comments on commit b8ed123

Please sign in to comment.