From 1fe6c44139fc3a15cafc61634dd0addd3d73f699 Mon Sep 17 00:00:00 2001 From: changluyi <47097611+changluyi@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:17:15 +0800 Subject: [PATCH] add mcast querier ip for multicast (#4375) * add mcast querier ip for multicast --------- Signed-off-by: clyi --- dist/images/install.sh | 4 + pkg/apis/kubeovn/v1/types.go | 2 + pkg/controller/ip.go | 3 + pkg/controller/subnet.go | 277 ++++++++++++++++++++++------- pkg/util/const.go | 2 + test/e2e/kube-ovn/subnet/subnet.go | 27 +++ 6 files changed, 247 insertions(+), 68 deletions(-) diff --git a/dist/images/install.sh b/dist/images/install.sh index 30bde657e10..051a5c69f08 100755 --- a/dist/images/install.sh +++ b/dist/images/install.sh @@ -2256,6 +2256,10 @@ spec: type: string u2oInterconnectionMAC: type: string + mcastQuerierIP: + type: string + mcastQuerierMAC: + type: string u2oInterconnectionVPC: type: string v4usingIPrange: diff --git a/pkg/apis/kubeovn/v1/types.go b/pkg/apis/kubeovn/v1/types.go index cda348c1b35..e53c4e8c579 100644 --- a/pkg/apis/kubeovn/v1/types.go +++ b/pkg/apis/kubeovn/v1/types.go @@ -245,6 +245,8 @@ type SubnetStatus struct { U2OInterconnectionMAC string `json:"u2oInterconnectionMAC"` U2OInterconnectionVPC string `json:"u2oInterconnectionVPC"` NatOutgoingPolicyRules []NatOutgoingPolicyRuleStatus `json:"natOutgoingPolicyRules"` + McastQuerierIP string `json:"mcastQuerierIP"` + McastQuerierMAC string `json:"mcastQuerierMAC"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controller/ip.go b/pkg/controller/ip.go index 46638347bf2..75e8894aecd 100644 --- a/pkg/controller/ip.go +++ b/pkg/controller/ip.go @@ -389,6 +389,9 @@ func (c *Controller) createOrUpdateIPCR(ipCRName, podName, ip, mac, subnetName, case strings.HasPrefix(podName, util.U2OInterconnName[0:19]): key = podName // interconn IP name ipName = podName + case strings.HasPrefix(podName, util.McastQuerierName[0:13]): + key = podName // mcast querier IP name + ipName = podName } } diff --git a/pkg/controller/subnet.go b/pkg/controller/subnet.go index 0a8cd2fd4ee..47eefb25dca 100644 --- a/pkg/controller/subnet.go +++ b/pkg/controller/subnet.go @@ -683,14 +683,11 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error { klog.Errorf("failed to get subnet's vpc '%s', %v", subnet.Spec.Vpc, err) return err } - - if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway { - if err := c.reconcileU2OInterconnectionIP(subnet); err != nil { - klog.Errorf("failed to reconcile underlay subnet %s to overlay interconnection %v", subnet.Name, err) - return err - } + _, isMcastQuerierChanged, err := c.reconcileSubnetSpecialIPs(subnet) + if err != nil { + klog.Errorf("failed to reconcile subnet %s Custom IPs %v", subnet.Name, err) + return err } - if err := c.checkSubnetConflict(subnet); err != nil { klog.Errorf("failed to check subnet %s, %v", subnet.Name, err) return err @@ -721,15 +718,9 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error { return err } - multicastSnoopFlag := map[string]string{"mcast_snoop": "true", "mcast_querier": "false"} - if subnet.Spec.EnableMulticastSnoop { - if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationInsert, multicastSnoopFlag); err != nil { - klog.Errorf("enable logical switch multicast snoop %s: %v", subnet.Name, err) - return err - } - } else { - if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationDelete, multicastSnoopFlag); err != nil { - klog.Errorf("disable logical switch multicast snoop %s: %v", subnet.Name, err) + if isMcastQuerierChanged { + if err := c.handleMcastQuerierChange(subnet); err != nil { + klog.Errorf("failed to handle mcast querier IP change for subnet %s: %v", subnet.Name, err) return err } } @@ -1815,85 +1806,186 @@ func (c *Controller) reconcileVlan(subnet *kubeovnv1.Subnet) error { return nil } -func (c *Controller) reconcileU2OInterconnectionIP(subnet *kubeovnv1.Subnet) error { - needCalcIP := false - if subnet.Spec.U2OInterconnection { +func (c *Controller) reconcileSubnetSpecialIPs(subnet *kubeovnv1.Subnet) (bool, bool, error) { + isU2OIPChanged := false + isMcastQuerierIPChanged := false + var err error + + // reconcile u2o IP + if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway { u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name) u2oInterconnLrpName := fmt.Sprintf("%s-%s", subnet.Spec.Vpc, subnet.Name) - var v4ip, v6ip, mac string - var err error - if subnet.Spec.U2OInterconnectionIP == "" && (subnet.Status.U2OInterconnectionIP == "" || subnet.Status.U2OInterconnectionMAC == "") { - v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName) + var v4ip, v6ip string + if subnet.Spec.U2OInterconnection { + v4ip, v6ip, _, err = c.acquireU2OIP(subnet, u2oInterconnName, u2oInterconnLrpName) if err != nil { - klog.Errorf("failed to acquire underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err) - return err - } - } else if subnet.Spec.U2OInterconnectionIP != "" && subnet.Status.U2OInterconnectionIP != subnet.Spec.U2OInterconnectionIP { - if subnet.Status.U2OInterconnectionIP != "" { - klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name) - c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name) + return isU2OIPChanged, isMcastQuerierIPChanged, err } - v4ip, v6ip, mac, err = c.acquireStaticIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName, subnet.Spec.U2OInterconnectionIP) + if v4ip != "" || v6ip != "" { + isU2OIPChanged = true + } + } else if subnet.Status.U2OInterconnectionIP != "" { + err = c.releaseU2OIP(subnet, u2oInterconnName) if err != nil { - klog.Errorf("failed to acquire static underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err) - return err + return isU2OIPChanged, isMcastQuerierIPChanged, err } + isU2OIPChanged = true } - if v4ip != "" || v6ip != "" { - switch subnet.Spec.Protocol { - case kubeovnv1.ProtocolIPv4: - subnet.Status.U2OInterconnectionIP = v4ip - case kubeovnv1.ProtocolIPv6: - subnet.Status.U2OInterconnectionIP = v6ip - case kubeovnv1.ProtocolDual: - subnet.Status.U2OInterconnectionIP = fmt.Sprintf("%s,%s", v4ip, v6ip) - } - if err := c.createOrUpdateIPCR("", u2oInterconnName, subnet.Status.U2OInterconnectionIP, mac, subnet.Name, "default", "", ""); err != nil { - klog.Errorf("failed to create or update IPs of %s : %v", u2oInterconnLrpName, err) - return err - } - - subnet.Status.U2OInterconnectionMAC = mac - needCalcIP = true + if isU2OIPChanged { + klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s", + subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP) } - } else if subnet.Status.U2OInterconnectionIP != "" { - u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name) - klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name) - c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name) - subnet.Status.U2OInterconnectionIP = "" - subnet.Status.U2OInterconnectionMAC = "" - subnet.Status.U2OInterconnectionVPC = "" + } - if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), u2oInterconnName, metav1.DeleteOptions{}); err != nil { - if !k8serrors.IsNotFound(err) { - klog.Errorf("failed to delete ip %s, %v", u2oInterconnName, err) - return err - } + // reconcile mcast querier IP + if subnet.Spec.EnableMulticastSnoop { + isMcastQuerierIPChanged, err = c.acquireMcastQuerierIP(subnet) + if err != nil { + return isU2OIPChanged, isMcastQuerierIPChanged, err + } + } else { + isMcastQuerierIPChanged, err = c.releaseMcastQuerierIP(subnet) + if err != nil { + return isU2OIPChanged, isMcastQuerierIPChanged, err } - - needCalcIP = true } - if needCalcIP { - klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s", - subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP) + // caculate subnet status + if isU2OIPChanged || isMcastQuerierIPChanged { if subnet.Spec.Protocol == kubeovnv1.ProtocolDual { if _, err := c.calcDualSubnetStatusIP(subnet); err != nil { klog.Error(err) - return err + return isU2OIPChanged, isMcastQuerierIPChanged, err } } else { if _, err := c.calcSubnetStatusIP(subnet); err != nil { klog.Error(err) - return err + return isU2OIPChanged, isMcastQuerierIPChanged, err } } } + + return isU2OIPChanged, isMcastQuerierIPChanged, nil +} + +func (c *Controller) acquireU2OIP(subnet *kubeovnv1.Subnet, u2oInterconnName, u2oInterconnLrpName string) (string, string, string, error) { + var v4ip, v6ip, mac string + var err error + if subnet.Spec.U2OInterconnectionIP == "" && (subnet.Status.U2OInterconnectionIP == "" || subnet.Status.U2OInterconnectionMAC == "") { + v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName) + if err != nil { + klog.Errorf("failed to acquire underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err) + return "", "", "", err + } + } else if subnet.Spec.U2OInterconnectionIP != "" && subnet.Status.U2OInterconnectionIP != subnet.Spec.U2OInterconnectionIP { + if subnet.Status.U2OInterconnectionIP != "" { + klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name) + c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name) + } + v4ip, v6ip, mac, err = c.acquireStaticIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName, subnet.Spec.U2OInterconnectionIP) + if err != nil { + klog.Errorf("failed to acquire static underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err) + return "", "", "", err + } + } + if v4ip != "" || v6ip != "" { + switch subnet.Spec.Protocol { + case kubeovnv1.ProtocolIPv4: + subnet.Status.U2OInterconnectionIP = v4ip + case kubeovnv1.ProtocolIPv6: + subnet.Status.U2OInterconnectionIP = v6ip + case kubeovnv1.ProtocolDual: + subnet.Status.U2OInterconnectionIP = fmt.Sprintf("%s,%s", v4ip, v6ip) + } + err = c.createOrUpdateIPCR("", u2oInterconnName, subnet.Status.U2OInterconnectionIP, mac, subnet.Name, "default", "", "") + if err != nil { + klog.Errorf("failed to create or update IPs of %s : %v", u2oInterconnLrpName, err) + return "", "", "", err + } + subnet.Status.U2OInterconnectionMAC = mac + } + return v4ip, v6ip, mac, nil +} + +func (c *Controller) releaseU2OIP(subnet *kubeovnv1.Subnet, u2oInterconnName string) error { + klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name) + c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name) + subnet.Status.U2OInterconnectionIP = "" + subnet.Status.U2OInterconnectionMAC = "" + subnet.Status.U2OInterconnectionVPC = "" + + err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), u2oInterconnName, metav1.DeleteOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + klog.Errorf("failed to delete ip %s, %v", u2oInterconnName, err) + return err + } + return nil } +func (c *Controller) acquireMcastQuerierIP(subnet *kubeovnv1.Subnet) (bool, error) { + isMcastQuerierChanged := false + mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name) + var v4ip, v6ip, mac string + var err error + + if subnet.Status.McastQuerierIP == "" || subnet.Status.McastQuerierMAC == "" { + v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, mcastQuerierLspName, mcastQuerierLspName) + if err != nil { + klog.Errorf("failed to acquire mcast querier ip address for subnet %s, %v", subnet.Name, err) + return isMcastQuerierChanged, err + } + } + + if v4ip != "" || v6ip != "" { + switch subnet.Spec.Protocol { + case kubeovnv1.ProtocolIPv4: + subnet.Status.McastQuerierIP = v4ip + case kubeovnv1.ProtocolIPv6: + subnet.Status.McastQuerierIP = v6ip + case kubeovnv1.ProtocolDual: + subnet.Status.McastQuerierIP = fmt.Sprintf("%s,%s", v4ip, v6ip) + } + + err := c.createOrUpdateIPCR("", mcastQuerierLspName, subnet.Status.McastQuerierIP, mac, subnet.Name, "default", "", "") + if err != nil { + klog.Errorf("failed to create or update IPs of %s : %v", mcastQuerierLspName, err) + return isMcastQuerierChanged, err + } + + subnet.Status.McastQuerierMAC = mac + klog.Infof("reconcile subnet %s mcast querier IP %s mac %s", + subnet.Name, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC) + isMcastQuerierChanged = true + } + + return isMcastQuerierChanged, nil +} + +func (c *Controller) releaseMcastQuerierIP(subnet *kubeovnv1.Subnet) (bool, error) { + isMcastQuerierChanged := false + if subnet.Status.McastQuerierIP != "" { + mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name) + klog.Infof("release mcast querier ip address %s for subnet %s", subnet.Status.McastQuerierIP, subnet.Name) + c.ipam.ReleaseAddressByPod(mcastQuerierLspName, subnet.Name) + subnet.Status.McastQuerierIP = "" + subnet.Status.McastQuerierMAC = "" + + if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), mcastQuerierLspName, metav1.DeleteOptions{}); err != nil { + if !k8serrors.IsNotFound(err) { + klog.Errorf("failed to delete ip %s, %v", mcastQuerierLspName, err) + return isMcastQuerierChanged, err + } + } + isMcastQuerierChanged = true + klog.Infof("reconcile subnet %s mcast querier IP %s mac %s", + subnet.Name, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC) + } + return isMcastQuerierChanged, nil +} + func (c *Controller) calcDualSubnetStatusIP(subnet *kubeovnv1.Subnet) (*kubeovnv1.Subnet, error) { if err := util.CheckCidrs(subnet.Spec.CIDRBlock); err != nil { return nil, err @@ -3133,3 +3225,52 @@ func (c *Controller) findSubnetByNetworkAttachmentDefinition(ns, name string, su return subnet, nil } + +func (c *Controller) handleMcastQuerierChange(subnet *kubeovnv1.Subnet) error { + if subnet.Spec.EnableMulticastSnoop { + multicastSnoopFlag := map[string]string{ + "mcast_snoop": "true", + "mcast_querier": "true", + "mcast_ip4_src": subnet.Status.McastQuerierIP, + "mcast_eth_src": subnet.Status.McastQuerierMAC, + } + mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name) + if err := c.OVNNbClient.CreateLogicalSwitchPort(subnet.Name, mcastQuerierLspName, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC, mcastQuerierLspName, "default", false, "", "", false, nil, ""); err != nil { + err = fmt.Errorf("failed to create mcast querier lsp %s: %w", mcastQuerierLspName, err) + klog.Error(err) + return err + } + + if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationInsert, multicastSnoopFlag); err != nil { + klog.Errorf("enable logical switch multicast snoop %s: %v", subnet.Name, err) + return err + } + } else { + lss, err := c.OVNNbClient.ListLogicalSwitch(false, func(ls *ovnnb.LogicalSwitch) bool { + return ls.Name == subnet.Name + }) + if err != nil || len(lss) == 0 { + klog.Errorf("failed to list logical switch %s: %v", subnet.Name, err) + return err + } + + multicastSnoopFlag := map[string]string{ + "mcast_snoop": lss[0].OtherConfig["mcast_snoop"], + "mcast_querier": lss[0].OtherConfig["mcast_querier"], + "mcast_ip4_src": lss[0].OtherConfig["mcast_ip4_src"], + "mcast_eth_src": lss[0].OtherConfig["mcast_eth_src"], + } + mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name) + if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationDelete, multicastSnoopFlag); err != nil { + klog.Errorf("disable logical switch multicast snoop %s: %v", subnet.Name, err) + return err + } + + if err := c.OVNNbClient.DeleteLogicalSwitchPort(mcastQuerierLspName); err != nil { + err = fmt.Errorf("failed to delete mcast querier lsp %s: %w", mcastQuerierLspName, err) + klog.Error(err) + return err + } + } + return nil +} diff --git a/pkg/util/const.go b/pkg/util/const.go index d401efa8369..2d7af73ffad 100644 --- a/pkg/util/const.go +++ b/pkg/util/const.go @@ -266,6 +266,8 @@ const ( U2OInterconnName = "u2o-interconnection.%s.%s" U2OExcludeIPAg = "%s.u2o_exclude_ip.%s" + McastQuerierName = "mcast-querier.%s" + DefaultServiceSessionStickinessTimeout = 10800 OvnSubnetGatewayIptables = "ovn-subnet-gateway" diff --git a/test/e2e/kube-ovn/subnet/subnet.go b/test/e2e/kube-ovn/subnet/subnet.go index 61d75544b92..4c16c98f6a1 100644 --- a/test/e2e/kube-ovn/subnet/subnet.go +++ b/test/e2e/kube-ovn/subnet/subnet.go @@ -1168,6 +1168,33 @@ var _ = framework.Describe("[group:subnet]", func() { } }) + framework.ConformanceIt("Should support subnet multicast snoop", func() { + f.SkipVersionPriorTo(1, 13, "subnet multicast snoop is introduced in v1.13") + ginkgo.By("Creating subnet " + subnetName) + subnet = framework.MakeSubnet(subnetName, "", cidr, "", "", "", nil, nil, nil) + subnet = subnetClient.CreateSync(subnet) + + ginkgo.By("Checking subnet multicast snoop enable " + subnetName) + subnet = subnetClient.Get(subnetName) + modifiedSubnet := subnet.DeepCopy() + modifiedSubnet.Spec.EnableMulticastSnoop = true + subnetClient.PatchSync(subnet, modifiedSubnet) + + subnet = subnetClient.Get(subnetName) + framework.ExpectNotEmpty(subnet.Status.McastQuerierIP) + framework.ExpectNotEmpty(subnet.Status.McastQuerierMAC) + + ginkgo.By("Checking subnet multicast snoop disable " + subnetName) + subnet = subnetClient.Get(subnetName) + modifiedSubnet = subnet.DeepCopy() + modifiedSubnet.Spec.EnableMulticastSnoop = false + subnetClient.PatchSync(subnet, modifiedSubnet) + + subnet = subnetClient.Get(subnetName) + framework.ExpectEmpty(subnet.Status.McastQuerierIP) + framework.ExpectEmpty(subnet.Status.McastQuerierMAC) + }) + framework.ConformanceIt("should support subnet add nat outgoing policy rules", func() { f.SkipVersionPriorTo(1, 12, "Support for subnet add nat outgoing policy rules in v1.12")