Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mcast querier ip for multicast #4375

Merged
merged 4 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2250,6 +2250,10 @@ spec:
type: string
u2oInterconnectionMAC:
type: string
mcastQuerierIP:
type: string
mcastQuerierMAC:
type: string
u2oInterconnectionVPC:
type: string
v4usingIPrange:
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/kubeovn/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ type SubnetStatus struct {
U2OInterconnectionMAC string `json:"u2oInterconnectionMAC"`
U2OInterconnectionVPC string `json:"u2oInterconnectionVPC"`
NatOutgoingPolicyRules []NatOutgoingPolicyRuleStatus `json:"natOutgoingPolicyRules"`
McastQuerierIP string `json:"mcastQuerierIP"`
McastQuerierMAC string `json:"mcastQuerierMAC"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,9 @@ func (c *Controller) createOrUpdateIPCR(ipCRName, podName, ip, mac, subnetName,
case strings.HasPrefix(podName, util.U2OInterconnName[0:19]):
key = podName // interconn IP name
ipName = podName
case strings.HasPrefix(podName, util.McastQuerierName[0:13]):
key = podName // mcast querier IP name
ipName = podName
}
}

Expand Down
277 changes: 209 additions & 68 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,14 +812,11 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
klog.Errorf("failed to get subnet's vpc '%s', %v", subnet.Spec.Vpc, err)
return err
}

if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway {
if err := c.reconcileU2OInterconnectionIP(subnet); err != nil {
klog.Errorf("failed to reconcile underlay subnet %s to overlay interconnection %v", subnet.Name, err)
return err
}
_, isMcastQuerierChanged, err := c.reconcileSubnetSpecialIPs(subnet)
if err != nil {
klog.Errorf("failed to reconcile subnet %s Custom IPs %v", subnet.Name, err)
return err
}

if err := c.checkSubnetConflict(subnet); err != nil {
klog.Errorf("failed to check subnet %s, %v", subnet.Name, err)
return err
Expand Down Expand Up @@ -850,15 +847,9 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
return err
}

multicastSnoopFlag := map[string]string{"mcast_snoop": "true", "mcast_querier": "false"}
if subnet.Spec.EnableMulticastSnoop {
if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationInsert, multicastSnoopFlag); err != nil {
klog.Errorf("enable logical switch multicast snoop %s: %v", subnet.Name, err)
return err
}
} else {
if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationDelete, multicastSnoopFlag); err != nil {
klog.Errorf("disable logical switch multicast snoop %s: %v", subnet.Name, err)
if isMcastQuerierChanged {
if err := c.handleMcastQuerierChange(subnet); err != nil {
klog.Errorf("failed to handle mcast querier IP change for subnet %s: %v", subnet.Name, err)
return err
}
}
Expand Down Expand Up @@ -1944,85 +1935,186 @@ func (c *Controller) reconcileVlan(subnet *kubeovnv1.Subnet) error {
return nil
}

func (c *Controller) reconcileU2OInterconnectionIP(subnet *kubeovnv1.Subnet) error {
needCalcIP := false
if subnet.Spec.U2OInterconnection {
func (c *Controller) reconcileSubnetSpecialIPs(subnet *kubeovnv1.Subnet) (bool, bool, error) {
isU2OIPChanged := false
isMcastQuerierIPChanged := false
var err error

// reconcile u2o IP
if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway {
u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name)
u2oInterconnLrpName := fmt.Sprintf("%s-%s", subnet.Spec.Vpc, subnet.Name)
var v4ip, v6ip, mac string
var err error
if subnet.Spec.U2OInterconnectionIP == "" && (subnet.Status.U2OInterconnectionIP == "" || subnet.Status.U2OInterconnectionMAC == "") {
v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName)
var v4ip, v6ip string
if subnet.Spec.U2OInterconnection {
v4ip, v6ip, _, err = c.acquireU2OIP(subnet, u2oInterconnName, u2oInterconnLrpName)
if err != nil {
klog.Errorf("failed to acquire underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err)
return err
}
} else if subnet.Spec.U2OInterconnectionIP != "" && subnet.Status.U2OInterconnectionIP != subnet.Spec.U2OInterconnectionIP {
if subnet.Status.U2OInterconnectionIP != "" {
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
return isU2OIPChanged, isMcastQuerierIPChanged, err
}

v4ip, v6ip, mac, err = c.acquireStaticIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName, subnet.Spec.U2OInterconnectionIP)
if v4ip != "" || v6ip != "" {
isU2OIPChanged = true
}
} else if subnet.Status.U2OInterconnectionIP != "" {
err = c.releaseU2OIP(subnet, u2oInterconnName)
if err != nil {
klog.Errorf("failed to acquire static underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err)
return err
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
isU2OIPChanged = true
}

if v4ip != "" || v6ip != "" {
switch subnet.Spec.Protocol {
case kubeovnv1.ProtocolIPv4:
subnet.Status.U2OInterconnectionIP = v4ip
case kubeovnv1.ProtocolIPv6:
subnet.Status.U2OInterconnectionIP = v6ip
case kubeovnv1.ProtocolDual:
subnet.Status.U2OInterconnectionIP = fmt.Sprintf("%s,%s", v4ip, v6ip)
}
if err := c.createOrUpdateIPCR("", u2oInterconnName, subnet.Status.U2OInterconnectionIP, mac, subnet.Name, "default", "", ""); err != nil {
klog.Errorf("failed to create or update IPs of %s : %v", u2oInterconnLrpName, err)
return err
}

subnet.Status.U2OInterconnectionMAC = mac
needCalcIP = true
if isU2OIPChanged {
klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s",
subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP)
}
} else if subnet.Status.U2OInterconnectionIP != "" {
u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name)
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
subnet.Status.U2OInterconnectionIP = ""
subnet.Status.U2OInterconnectionMAC = ""
subnet.Status.U2OInterconnectionVPC = ""
}

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), u2oInterconnName, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", u2oInterconnName, err)
return err
}
// reconcile mcast querier IP
if subnet.Spec.EnableMulticastSnoop {
isMcastQuerierIPChanged, err = c.acquireMcastQuerierIP(subnet)
if err != nil {
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
} else {
isMcastQuerierIPChanged, err = c.releaseMcastQuerierIP(subnet)
if err != nil {
return isU2OIPChanged, isMcastQuerierIPChanged, err
}

needCalcIP = true
}

if needCalcIP {
klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s",
subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP)
// caculate subnet status
if isU2OIPChanged || isMcastQuerierIPChanged {
if subnet.Spec.Protocol == kubeovnv1.ProtocolDual {
if _, err := c.calcDualSubnetStatusIP(subnet); err != nil {
klog.Error(err)
return err
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
} else {
if _, err := c.calcSubnetStatusIP(subnet); err != nil {
klog.Error(err)
return err
return isU2OIPChanged, isMcastQuerierIPChanged, err
}
}
}

return isU2OIPChanged, isMcastQuerierIPChanged, nil
}

func (c *Controller) acquireU2OIP(subnet *kubeovnv1.Subnet, u2oInterconnName, u2oInterconnLrpName string) (string, string, string, error) {
var v4ip, v6ip, mac string
var err error
if subnet.Spec.U2OInterconnectionIP == "" && (subnet.Status.U2OInterconnectionIP == "" || subnet.Status.U2OInterconnectionMAC == "") {
v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName)
if err != nil {
klog.Errorf("failed to acquire underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err)
return "", "", "", err
}
} else if subnet.Spec.U2OInterconnectionIP != "" && subnet.Status.U2OInterconnectionIP != subnet.Spec.U2OInterconnectionIP {
if subnet.Status.U2OInterconnectionIP != "" {
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
}
v4ip, v6ip, mac, err = c.acquireStaticIPAddress(subnet.Name, u2oInterconnName, u2oInterconnLrpName, subnet.Spec.U2OInterconnectionIP)
if err != nil {
klog.Errorf("failed to acquire static underlay to overlay interconnection ip address for subnet %s, %v", subnet.Name, err)
return "", "", "", err
}
}
if v4ip != "" || v6ip != "" {
switch subnet.Spec.Protocol {
case kubeovnv1.ProtocolIPv4:
subnet.Status.U2OInterconnectionIP = v4ip
case kubeovnv1.ProtocolIPv6:
subnet.Status.U2OInterconnectionIP = v6ip
case kubeovnv1.ProtocolDual:
subnet.Status.U2OInterconnectionIP = fmt.Sprintf("%s,%s", v4ip, v6ip)
}
err = c.createOrUpdateIPCR("", u2oInterconnName, subnet.Status.U2OInterconnectionIP, mac, subnet.Name, "default", "", "")
if err != nil {
klog.Errorf("failed to create or update IPs of %s : %v", u2oInterconnLrpName, err)
return "", "", "", err
}
subnet.Status.U2OInterconnectionMAC = mac
}
return v4ip, v6ip, mac, nil
}

func (c *Controller) releaseU2OIP(subnet *kubeovnv1.Subnet, u2oInterconnName string) error {
klog.Infof("release underlay to overlay interconnection ip address %s for subnet %s", subnet.Status.U2OInterconnectionIP, subnet.Name)
c.ipam.ReleaseAddressByPod(u2oInterconnName, subnet.Name)
subnet.Status.U2OInterconnectionIP = ""
subnet.Status.U2OInterconnectionMAC = ""
subnet.Status.U2OInterconnectionVPC = ""

err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), u2oInterconnName, metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", u2oInterconnName, err)
return err
}

return nil
}

func (c *Controller) acquireMcastQuerierIP(subnet *kubeovnv1.Subnet) (bool, error) {
isMcastQuerierChanged := false
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
var v4ip, v6ip, mac string
var err error

if subnet.Status.McastQuerierIP == "" || subnet.Status.McastQuerierMAC == "" {
v4ip, v6ip, mac, err = c.acquireIPAddress(subnet.Name, mcastQuerierLspName, mcastQuerierLspName)
if err != nil {
klog.Errorf("failed to acquire mcast querier ip address for subnet %s, %v", subnet.Name, err)
return isMcastQuerierChanged, err
}
}

if v4ip != "" || v6ip != "" {
switch subnet.Spec.Protocol {
case kubeovnv1.ProtocolIPv4:
subnet.Status.McastQuerierIP = v4ip
case kubeovnv1.ProtocolIPv6:
subnet.Status.McastQuerierIP = v6ip
case kubeovnv1.ProtocolDual:
subnet.Status.McastQuerierIP = fmt.Sprintf("%s,%s", v4ip, v6ip)
}

err := c.createOrUpdateIPCR("", mcastQuerierLspName, subnet.Status.McastQuerierIP, mac, subnet.Name, "default", "", "")
if err != nil {
klog.Errorf("failed to create or update IPs of %s : %v", mcastQuerierLspName, err)
return isMcastQuerierChanged, err
}

subnet.Status.McastQuerierMAC = mac
klog.Infof("reconcile subnet %s mcast querier IP %s mac %s",
subnet.Name, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC)
isMcastQuerierChanged = true
}

return isMcastQuerierChanged, nil
}

func (c *Controller) releaseMcastQuerierIP(subnet *kubeovnv1.Subnet) (bool, error) {
isMcastQuerierChanged := false
if subnet.Status.McastQuerierIP != "" {
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
klog.Infof("release mcast querier ip address %s for subnet %s", subnet.Status.McastQuerierIP, subnet.Name)
c.ipam.ReleaseAddressByPod(mcastQuerierLspName, subnet.Name)
subnet.Status.McastQuerierIP = ""
subnet.Status.McastQuerierMAC = ""

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), mcastQuerierLspName, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", mcastQuerierLspName, err)
return isMcastQuerierChanged, err
}
}
isMcastQuerierChanged = true
klog.Infof("reconcile subnet %s mcast querier IP %s mac %s",
subnet.Name, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC)
}
return isMcastQuerierChanged, nil
}

func (c *Controller) calcDualSubnetStatusIP(subnet *kubeovnv1.Subnet) (*kubeovnv1.Subnet, error) {
if err := util.CheckCidrs(subnet.Spec.CIDRBlock); err != nil {
return nil, err
Expand Down Expand Up @@ -3262,3 +3354,52 @@ func (c *Controller) findSubnetByNetworkAttachmentDefinition(ns, name string, su

return subnet, nil
}

func (c *Controller) handleMcastQuerierChange(subnet *kubeovnv1.Subnet) error {
if subnet.Spec.EnableMulticastSnoop {
multicastSnoopFlag := map[string]string{
"mcast_snoop": "true",
"mcast_querier": "true",
"mcast_ip4_src": subnet.Status.McastQuerierIP,
"mcast_eth_src": subnet.Status.McastQuerierMAC,
}
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
if err := c.OVNNbClient.CreateLogicalSwitchPort(subnet.Name, mcastQuerierLspName, subnet.Status.McastQuerierIP, subnet.Status.McastQuerierMAC, mcastQuerierLspName, "default", false, "", "", false, nil, ""); err != nil {
err = fmt.Errorf("failed to create mcast querier lsp %s: %w", mcastQuerierLspName, err)
klog.Error(err)
return err
}

if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationInsert, multicastSnoopFlag); err != nil {
klog.Errorf("enable logical switch multicast snoop %s: %v", subnet.Name, err)
return err
}
} else {
lss, err := c.OVNNbClient.ListLogicalSwitch(false, func(ls *ovnnb.LogicalSwitch) bool {
return ls.Name == subnet.Name
})
if err != nil || len(lss) == 0 {
klog.Errorf("failed to list logical switch %s: %v", subnet.Name, err)
return err
}

multicastSnoopFlag := map[string]string{
"mcast_snoop": lss[0].OtherConfig["mcast_snoop"],
"mcast_querier": lss[0].OtherConfig["mcast_querier"],
"mcast_ip4_src": lss[0].OtherConfig["mcast_ip4_src"],
"mcast_eth_src": lss[0].OtherConfig["mcast_eth_src"],
}
mcastQuerierLspName := fmt.Sprintf(util.McastQuerierName, subnet.Name)
if err := c.OVNNbClient.LogicalSwitchUpdateOtherConfig(subnet.Name, ovsdb.MutateOperationDelete, multicastSnoopFlag); err != nil {
klog.Errorf("disable logical switch multicast snoop %s: %v", subnet.Name, err)
return err
}

if err := c.OVNNbClient.DeleteLogicalSwitchPort(mcastQuerierLspName); err != nil {
err = fmt.Errorf("failed to delete mcast querier lsp %s: %w", mcastQuerierLspName, err)
klog.Error(err)
return err
}
}
return nil
}
2 changes: 2 additions & 0 deletions pkg/util/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ const (
U2OInterconnName = "u2o-interconnection.%s.%s"
U2OExcludeIPAg = "%s.u2o_exclude_ip.%s"

McastQuerierName = "mcast-querier.%s"

DefaultServiceSessionStickinessTimeout = 10800

OvnSubnetGatewayIptables = "ovn-subnet-gateway"
Expand Down
Loading
Loading