Skip to content

Commit

Permalink
add mcast querier ip for multicast (kubeovn#4375)
Browse files Browse the repository at this point in the history
* add mcast querier ip for multicast

---------

Signed-off-by: clyi <[email protected]>
  • Loading branch information
changluyi authored and zcq98 committed Sep 12, 2024
1 parent b0bb154 commit 1fe6c44
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 68 deletions.
4 changes: 4 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2256,6 +2256,10 @@ spec:
type: string
u2oInterconnectionMAC:
type: string
mcastQuerierIP:
type: string
mcastQuerierMAC:
type: string
u2oInterconnectionVPC:
type: string
v4usingIPrange:
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/kubeovn/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ type SubnetStatus struct {
U2OInterconnectionMAC string `json:"u2oInterconnectionMAC"`
U2OInterconnectionVPC string `json:"u2oInterconnectionVPC"`
NatOutgoingPolicyRules []NatOutgoingPolicyRuleStatus `json:"natOutgoingPolicyRules"`
McastQuerierIP string `json:"mcastQuerierIP"`
McastQuerierMAC string `json:"mcastQuerierMAC"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,9 @@ func (c *Controller) createOrUpdateIPCR(ipCRName, podName, ip, mac, subnetName,
case strings.HasPrefix(podName, util.U2OInterconnName[0:19]):
key = podName // interconn IP name
ipName = podName
case strings.HasPrefix(podName, util.McastQuerierName[0:13]):
key = podName // mcast querier IP name
ipName = podName
}
}

Expand Down
277 changes: 209 additions & 68 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,14 +683,11 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
klog.Errorf("failed to get subnet's vpc '%s', %v", subnet.Spec.Vpc, err)
return err
}

if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway {
if err := c.reconcileU2OInterconnectionIP(subnet); err != nil {
klog.Errorf("failed to reconcile underlay subnet %s to overlay interconnection %v", subnet.Name, err)
return err
}
_, isMcastQuerierChanged, err := c.reconcileSubnetSpecialIPs(subnet)
if err != nil {
klog.Errorf("failed to reconcile subnet %s Custom IPs %v", subnet.Name, err)
return err
}

if err := c.checkSubnetConflict(subnet); err != nil {
klog.Errorf("failed to check subnet %s, %v", subnet.Name, err)
return err
Expand Down Expand Up @@ -721,15 +718,9 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
return err
}

multicastSnoopFlag := map[string]string{"mcast_snoop": "true", "mcast_querier": "false"}
if subnet.Spec.EnableMulticastSnoop {
if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationInsert, multicastSnoopFlag); err != nil {
klog.Errorf("enable logical switch multicast snoop %s: %v", subnet.Name, err)
return err
}
} else {
if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationDelete, multicastSnoopFlag); err != nil {
klog.Errorf("disable logical switch multicast snoop %s: %v", subnet.Name, err)
if isMcastQuerierChanged {
if err := c.handleMcastQuerierChange(subnet); err != nil {
klog.Errorf("failed to handle mcast querier IP change for subnet %s: %v", subnet.Name, err)
return err
}
}
Expand Down Expand Up @@ -1815,85 +1806,186 @@ func (c *Controller) reconcileVlan(subnet *kubeovnv1.Subnet) error {
return nil
}

func (c *Controller) reconcileU2OInterconnectionIP(subnet *kubeovnv1.Subnet) error {
needCalcIP := false
if subnet.Spec.U2OInterconnection {
func (c *Controller) reconcileSubnetSpecialIPs(subnet *kubeovnv1.Subnet) (bool, bool, error) {
isU2OIPChanged := false
isMcastQuerierIPChanged := false
var err error

// reconcile u2o IP
if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway {
u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name)
u2oInterconnLrpName := fmt.Sprintf("%s-%s", subnet.Spec.Vpc, subnet.Name)
var v4ip, v6ip, mac string
var err error
if subnet.Spec.U2OInterconnectionIP == "" && (subnet.Status.U2OInterconnectionIP == "" || subnet.Status.U2OInterconnectionMAC == "") {
v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName)
var v4ip, v6ip string
if subnet.Spec.U2OInterconnection {
v4ip, v6ip, _, err = c.acquireU2OIP(subnet, u2oInterconnName, u2oInterconnLrpName)
if err != nil {
klog.Errorf("failed to acquire underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err)
return err
}
} else if subnet.Spec.U2OInterconnectionIP != "" && subnet.Status.U2OInterconnectionIP != subnet.Spec.U2OInterconnectionIP {
if subnet.Status.U2OInterconnectionIP != "" {
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
return isU2OIPChanged, isMcastQuerierIPChanged, err
}

v4ip, v6ip, mac, err = c.acquireStaticIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName, subnet.Spec.U2OInterconnectionIP)
if v4ip != "" || v6ip != "" {
isU2OIPChanged = true
}
} else if subnet.Status.U2OInterconnectionIP != "" {
err = c.releaseU2OIP(subnet, u2oInterconnName)
if err != nil {
klog.Errorf("failed to acquire static underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err)
return err
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
isU2OIPChanged = true
}

if v4ip != "" || v6ip != "" {
switch subnet.Spec.Protocol {
case kubeovnv1.ProtocolIPv4:
subnet.Status.U2OInterconnectionIP = v4ip
case kubeovnv1.ProtocolIPv6:
subnet.Status.U2OInterconnectionIP = v6ip
case kubeovnv1.ProtocolDual:
subnet.Status.U2OInterconnectionIP = fmt.Sprintf("%s,%s", v4ip, v6ip)
}
if err := c.createOrUpdateIPCR("", u2oInterconnName, subnet.Status.U2OInterconnectionIP, mac, subnet.Name, "default", "", ""); err != nil {
klog.Errorf("failed to create or update IPs of %s : %v", u2oInterconnLrpName, err)
return err
}

subnet.Status.U2OInterconnectionMAC = mac
needCalcIP = true
if isU2OIPChanged {
klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s",
subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP)
}
} else if subnet.Status.U2OInterconnectionIP != "" {
u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name)
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
subnet.Status.U2OInterconnectionIP = ""
subnet.Status.U2OInterconnectionMAC = ""
subnet.Status.U2OInterconnectionVPC = ""
}

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), u2oInterconnName, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", u2oInterconnName, err)
return err
}
// reconcile mcast querier IP
if subnet.Spec.EnableMulticastSnoop {
isMcastQuerierIPChanged, err = c.acquireMcastQuerierIP(subnet)
if err != nil {
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
} else {
isMcastQuerierIPChanged, err = c.releaseMcastQuerierIP(subnet)
if err != nil {
return isU2OIPChanged, isMcastQuerierIPChanged, err
}

needCalcIP = true
}

if needCalcIP {
klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s",
subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP)
// caculate subnet status
if isU2OIPChanged || isMcastQuerierIPChanged {
if subnet.Spec.Protocol == kubeovnv1.ProtocolDual {
if _, err := c.calcDualSubnetStatusIP(subnet); err != nil {
klog.Error(err)
return err
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
} else {
if _, err := c.calcSubnetStatusIP(subnet); err != nil {
klog.Error(err)
return err
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
}
}

return isU2OIPChanged, isMcastQuerierIPChanged, nil
}

func (c *Controller) acquireU2OIP(subnet *kubeovnv1.Subnet, u2oInterconnName, u2oInterconnLrpName string) (string, string, string, error) {
var v4ip, v6ip, mac string
var err error
if subnet.Spec.U2OInterconnectionIP == "" && (subnet.Status.U2OInterconnectionIP == "" || subnet.Status.U2OInterconnectionMAC == "") {
v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName)
if err != nil {
klog.Errorf("failed to acquire underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err)
return "", "", "", err
}
} else if subnet.Spec.U2OInterconnectionIP != "" && subnet.Status.U2OInterconnectionIP != subnet.Spec.U2OInterconnectionIP {
if subnet.Status.U2OInterconnectionIP != "" {
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
}
v4ip, v6ip, mac, err = c.acquireStaticIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName, subnet.Spec.U2OInterconnectionIP)
if err != nil {
klog.Errorf("failed to acquire static underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err)
return "", "", "", err
}
}
if v4ip != "" || v6ip != "" {
switch subnet.Spec.Protocol {
case kubeovnv1.ProtocolIPv4:
subnet.Status.U2OInterconnectionIP = v4ip
case kubeovnv1.ProtocolIPv6:
subnet.Status.U2OInterconnectionIP = v6ip
case kubeovnv1.ProtocolDual:
subnet.Status.U2OInterconnectionIP = fmt.Sprintf("%s,%s", v4ip, v6ip)
}
err = c.createOrUpdateIPCR("", u2oInterconnName, subnet.Status.U2OInterconnectionIP, mac, subnet.Name, "default", "", "")
if err != nil {
klog.Errorf("failed to create or update IPs of %s : %v", u2oInterconnLrpName, err)
return "", "", "", err
}
subnet.Status.U2OInterconnectionMAC = mac
}
return v4ip, v6ip, mac, nil
}

func (c *Controller) releaseU2OIP(subnet *kubeovnv1.Subnet, u2oInterconnName string) error {
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
subnet.Status.U2OInterconnectionIP = ""
subnet.Status.U2OInterconnectionMAC = ""
subnet.Status.U2OInterconnectionVPC = ""

err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), u2oInterconnName, metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", u2oInterconnName, err)
return err
}

return nil
}

func (c *Controller) acquireMcastQuerierIP(subnet *kubeovnv1.Subnet) (bool, error) {
isMcastQuerierChanged := false
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
var v4ip, v6ip, mac string
var err error

if subnet.Status.McastQuerierIP == "" || subnet.Status.McastQuerierMAC == "" {
v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, mcastQuerierLspName, mcastQuerierLspName)
if err != nil {
klog.Errorf("failed to acquire mcast querier ip address for subnet %s, %v", subnet.Name, err)
return isMcastQuerierChanged, err
}
}

if v4ip != "" || v6ip != "" {
switch subnet.Spec.Protocol {
case kubeovnv1.ProtocolIPv4:
subnet.Status.McastQuerierIP = v4ip
case kubeovnv1.ProtocolIPv6:
subnet.Status.McastQuerierIP = v6ip
case kubeovnv1.ProtocolDual:
subnet.Status.McastQuerierIP = fmt.Sprintf("%s,%s", v4ip, v6ip)
}

err := c.createOrUpdateIPCR("", mcastQuerierLspName, subnet.Status.McastQuerierIP, mac, subnet.Name, "default", "", "")
if err != nil {
klog.Errorf("failed to create or update IPs of %s : %v", mcastQuerierLspName, err)
return isMcastQuerierChanged, err
}

subnet.Status.McastQuerierMAC = mac
klog.Infof("reconcile subnet %s mcast querier IP %s mac %s",
subnet.Name, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC)
isMcastQuerierChanged = true
}

return isMcastQuerierChanged, nil
}

func (c *Controller) releaseMcastQuerierIP(subnet *kubeovnv1.Subnet) (bool, error) {
isMcastQuerierChanged := false
if subnet.Status.McastQuerierIP != "" {
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
klog.Infof("release mcast querier ip address %s for subnet %s", subnet.Status.McastQuerierIP, subnet.Name)
c.ipam.ReleaseAddressByPod(mcastQuerierLspName, subnet.Name)
subnet.Status.McastQuerierIP = ""
subnet.Status.McastQuerierMAC = ""

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), mcastQuerierLspName, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", mcastQuerierLspName, err)
return isMcastQuerierChanged, err
}
}
isMcastQuerierChanged = true
klog.Infof("reconcile subnet %s mcast querier IP %s mac %s",
subnet.Name, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC)
}
return isMcastQuerierChanged, nil
}

func (c *Controller) calcDualSubnetStatusIP(subnet *kubeovnv1.Subnet) (*kubeovnv1.Subnet, error) {
if err := util.CheckCidrs(subnet.Spec.CIDRBlock); err != nil {
return nil, err
Expand Down Expand Up @@ -3133,3 +3225,52 @@ func (c *Controller) findSubnetByNetworkAttachmentDefinition(ns, name string, su

return subnet, nil
}

func (c *Controller) handleMcastQuerierChange(subnet *kubeovnv1.Subnet) error {
if subnet.Spec.EnableMulticastSnoop {
multicastSnoopFlag := map[string]string{
"mcast_snoop": "true",
"mcast_querier": "true",
"mcast_ip4_src": subnet.Status.McastQuerierIP,
"mcast_eth_src": subnet.Status.McastQuerierMAC,
}
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
if err := c.OVNNbClient.CreateLogicalSwitchPort(subnet.Name, mcastQuerierLspName, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC, mcastQuerierLspName, "default", false, "", "", false, nil, ""); err != nil {
err = fmt.Errorf("failed to create mcast querier lsp %s: %w", mcastQuerierLspName, err)
klog.Error(err)
return err
}

if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationInsert, multicastSnoopFlag); err != nil {
klog.Errorf("enable logical switch multicast snoop %s: %v", subnet.Name, err)
return err
}
} else {
lss, err := c.OVNNbClient.ListLogicalSwitch(false, func(ls *ovnnb.LogicalSwitch) bool {
return ls.Name == subnet.Name
})
if err != nil || len(lss) == 0 {
klog.Errorf("failed to list logical switch %s: %v", subnet.Name, err)
return err
}

multicastSnoopFlag := map[string]string{
"mcast_snoop": lss[0].OtherConfig["mcast_snoop"],
"mcast_querier": lss[0].OtherConfig["mcast_querier"],
"mcast_ip4_src": lss[0].OtherConfig["mcast_ip4_src"],
"mcast_eth_src": lss[0].OtherConfig["mcast_eth_src"],
}
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationDelete, multicastSnoopFlag); err != nil {
klog.Errorf("disable logical switch multicast snoop %s: %v", subnet.Name, err)
return err
}

if err := c.OVNNbClient.DeleteLogicalSwitchPort(mcastQuerierLspName); err != nil {
err = fmt.Errorf("failed to delete mcast querier lsp %s: %w", mcastQuerierLspName, err)
klog.Error(err)
return err
}
}
return nil
}
2 changes: 2 additions & 0 deletions pkg/util/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ const (
U2OInterconnName = "u2o-interconnection.%s.%s"
U2OExcludeIPAg = "%s.u2o_exclude_ip.%s"

McastQuerierName = "mcast-querier.%s"

DefaultServiceSessionStickinessTimeout = 10800

OvnSubnetGatewayIptables = "ovn-subnet-gateway"
Expand Down
Loading

0 comments on commit 1fe6c44

Please sign in to comment.