diff --git a/pkg/daemon/gateway.go b/pkg/daemon/gateway.go index 5c3af620a49..9333a8aeac3 100644 --- a/pkg/daemon/gateway.go +++ b/pkg/daemon/gateway.go @@ -260,7 +260,7 @@ func (c *Controller) getTProxyConditionPod(needSort bool) ([]*v1.Pod, error) { } for _, pod := range pods { - if pod.Spec.NodeName != c.config.NodeName { + if pod.Spec.HostNetwork || pod.Spec.NodeName != c.config.NodeName { continue } @@ -271,7 +271,7 @@ func (c *Controller) getTProxyConditionPod(needSort bool) ([]*v1.Pod, error) { subnet, err := c.subnetsLister.Get(subnetName) if err != nil { - return nil, fmt.Errorf("failed to get subnet '%s', err: %w", subnetName, err) + return nil, fmt.Errorf("failed to get subnet %q: %w", subnetName, err) } if subnet.Spec.Vpc == c.config.ClusterRouter { diff --git a/pkg/daemon/gateway_linux.go b/pkg/daemon/gateway_linux.go index 7557d7f30a1..916cdf43c18 100644 --- a/pkg/daemon/gateway_linux.go +++ b/pkg/daemon/gateway_linux.go @@ -62,6 +62,11 @@ const ( TProxyPreroutingMask = util.TProxyPreroutingMask ) +var ( + tProxyOutputMarkMask = fmt.Sprintf("%#x/%#x", TProxyOutputMark, TProxyOutputMask) + tProxyPreRoutingMarkMask = fmt.Sprintf("%#x/%#x", TProxyPreroutingMark, TProxyPreroutingMask) +) + type policyRouteMeta struct { family int source string @@ -856,7 +861,6 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) e ipt := c.iptables[protocol] tproxyPreRoutingRules := make([]util.IPTableRule, 0) tproxyOutputRules := make([]util.IPTableRule, 0) - probePorts := strset.New() pods, err := c.getTProxyConditionPod(true) if err != nil { @@ -877,54 +881,12 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) e continue } - for _, container := range pod.Spec.Containers { - if container.ReadinessProbe != nil { - if httpGet := container.ReadinessProbe.HTTPGet; httpGet != nil { - if port := httpGet.Port.String(); port != "" { - probePorts.Add(port) - } - } - - if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil { - if port := tcpSocket.Port.String(); port != "" { - if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok { - if isTCPProbePortReachable.(bool) { - probePorts.Add(port) - } - } - } - } - } - - if container.LivenessProbe != nil { - if httpGet := container.LivenessProbe.HTTPGet; httpGet != nil { - if port := httpGet.Port.String(); port != "" { - probePorts.Add(port) - } - } - - if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil { - if port := tcpSocket.Port.String(); port != "" { - if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok { - if isTCPProbePortReachable.(bool) { - probePorts.Add(port) - } - } - } - } - } - } - - if probePorts.IsEmpty() { + ports := getProbePorts(pod) + if ports.Len() == 0 { continue } - probePortList := probePorts.List() - sort.Strings(probePortList) - for _, probePort := range probePortList { - tProxyOutputMarkMask := fmt.Sprintf("%#x/%#x", TProxyOutputMark, TProxyOutputMask) - tProxyPreRoutingMarkMask := fmt.Sprintf("%#x/%#x", TProxyPreroutingMark, TProxyPreroutingMask) - + for _, probePort := range ports.SortedList() { hostIP := pod.Status.HostIP prefixLen := 32 if protocol == kubeovnv1.ProtocolIPv6 { @@ -938,8 +900,8 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) e hostIP = "::" } } - tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, prefixLen, probePort, tProxyOutputMarkMask))}) - tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, prefixLen, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))}) + tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %d -j MARK --set-xmark %s`, podIP, prefixLen, probePort, tProxyOutputMarkMask))}) + tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %d -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, prefixLen, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))}) } } diff --git a/pkg/daemon/tproxy_linux.go b/pkg/daemon/tproxy_linux.go index d0bd3801945..7c8e66c3d43 100644 --- a/pkg/daemon/tproxy_linux.go +++ b/pkg/daemon/tproxy_linux.go @@ -10,11 +10,13 @@ import ( "syscall" "github.com/containernetworking/plugins/pkg/ns" - "github.com/scylladb/go-set/strset" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" "k8s.io/utils/ptr" + "k8s.io/utils/set" kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" "github.com/kubeovn/kube-ovn/pkg/ovs" @@ -56,9 +58,43 @@ func (c *Controller) StartTProxyForwarding() { } } -func (c *Controller) StartTProxyTCPPortProbe() { - probePorts := strset.New() +func getProbePorts(pod *corev1.Pod) set.Set[int32] { + ports := set.New[int32]() + for _, container := range pod.Spec.Containers { + for _, probe := range [...]*corev1.Probe{container.LivenessProbe, container.ReadinessProbe} { + if probe == nil { + continue + } + var port intstr.IntOrString + switch { + case probe.TCPSocket != nil: + port = probe.TCPSocket.Port + case probe.HTTPGet != nil: + port = probe.HTTPGet.Port + case probe.GRPC != nil: + port = intstr.FromInt32(probe.GRPC.Port) + default: + continue + } + if port.Type == intstr.Int { + ports.Insert(port.IntVal) + continue + } + for _, p := range container.Ports { + if p.Name == port.StrVal { + ports.Insert(p.ContainerPort) + break + } + } + } + } + ports.Delete(0) + klog.Infof("probe ports for pod %s/%s: %v", pod.Namespace, pod.Name, ports.SortedList()) + return ports +} + +func (c *Controller) StartTProxyTCPPortProbe() { pods, err := c.getTProxyConditionPod(false) if err != nil { return @@ -67,33 +103,19 @@ func (c *Controller) StartTProxyTCPPortProbe() { for _, pod := range pods { iface := ovs.PodNameToPortName(pod.Name, pod.Namespace, util.OvnProvider) nsName, err := ovs.GetInterfacePodNs(iface) - if err != nil || nsName == "" { - klog.Infof("iface %s's namespace not found", iface) + if err != nil { + klog.Errorf("failed to get netns for pod %s/%s: %v", pod.Namespace, pod.Name, err) + continue + } + if nsName == "" { + klog.Infof("netns for pod %s/%s not found", pod.Namespace, pod.Name) continue } + ports := getProbePorts(pod) for _, podIP := range pod.Status.PodIPs { customVPCPodIPToNs.Store(podIP.IP, nsName) - for _, container := range pod.Spec.Containers { - if container.ReadinessProbe != nil { - if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil { - if port := tcpSocket.Port.String(); port != "" { - probePorts.Add(port) - } - } - } - - if container.LivenessProbe != nil { - if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil { - if port := tcpSocket.Port.String(); port != "" { - probePorts.Add(port) - } - } - } - } - - probePortsList := probePorts.List() - for _, port := range probePortsList { + for _, port := range ports.UnsortedList() { probePortInNs(podIP.IP, port, true, nil) } } @@ -264,7 +286,7 @@ func delRouteIfExist(family, table int, dst *net.IPNet) error { } func handleRedirectFlow(conn net.Conn) { - klog.V(5).Infof("Accepting TCP connection from %v with destination of %v", conn.RemoteAddr().String(), conn.LocalAddr().String()) + klog.V(5).Infof("accepting TCP connection from %s to %s", conn.RemoteAddr(), conn.LocalAddr()) defer func() { if err := conn.Close(); err != nil { klog.Errorf("conn Close err: %v", err) @@ -278,42 +300,44 @@ func handleRedirectFlow(conn net.Conn) { return } - probePortInNs(podIP, probePort, false, conn) + port, err := strconv.ParseInt(probePort, 10, 32) + if err != nil { + klog.Errorf("failed to parse port number %q: %v", probePort, err) + return + } + + probePortInNs(podIP, int32(port), false, conn) // #nosec G115 } -func probePortInNs(podIP, probePort string, isTProxyProbe bool, conn net.Conn) { +func probePortInNs(podIP string, probePort int32, isTProxyProbe bool, conn net.Conn) { podNs, ok := customVPCPodIPToNs.Load(podIP) if !ok { - return - } - - iprobePort, err := strconv.Atoi(probePort) - if err != nil { + klog.V(3).Infof("failed to get netns for pod with ip %s", podIP) return } podNS, err := ns.GetNS(podNs.(string)) if err != nil { customVPCPodIPToNs.Delete(podIP) - klog.Infof("ns %s already deleted", podNs) + klog.V(3).Infof("netns %s not found", podNs) return } _ = ns.WithNetNSPath(podNS.Path(), func(_ ns.NetNS) error { // Packet's src and dst IP are both PodIP in netns localpodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP)} - remotepodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP), Port: iprobePort} + remotepodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP), Port: int(probePort)} remoteConn, err := goTProxy.DialTCP(&localpodTCPAddr, &remotepodTCPAddr, !isTProxyProbe) if err != nil { if isTProxyProbe { - customVPCPodTCPProbeIPPort.Store(getIPPortString(podIP, probePort), false) + customVPCPodTCPProbeIPPort.Store(util.JoinHostPort(podIP, probePort), false) } return nil } if isTProxyProbe { - customVPCPodTCPProbeIPPort.Store(getIPPortString(podIP, probePort), true) + customVPCPodTCPProbeIPPort.Store(util.JoinHostPort(podIP, probePort), true) return nil } @@ -342,10 +366,6 @@ func probePortInNs(podIP, probePort string, isTProxyProbe bool, conn net.Conn) { }) } -func getIPPortString(podIP, port string) string { - return fmt.Sprintf("%s|%s", podIP, port) -} - func getProtocols(protocol string) []string { var protocols []string if protocol == kubeovnv1.ProtocolDual { diff --git a/test/e2e/kube-ovn/pod/vpc_pod_probe.go b/test/e2e/kube-ovn/pod/vpc_pod_probe.go index 233ced69f40..ff7e7f0247b 100644 --- a/test/e2e/kube-ovn/pod/vpc_pod_probe.go +++ b/test/e2e/kube-ovn/pod/vpc_pod_probe.go @@ -17,6 +17,11 @@ import ( "github.com/kubeovn/kube-ovn/test/e2e/framework/iptables" ) +var ( + tProxyOutputMarkMask = fmt.Sprintf("%#x/%#x", util.TProxyOutputMark, util.TProxyOutputMask) + tProxyPreRoutingMarkMask = fmt.Sprintf("%#x/%#x", util.TProxyPreroutingMark, util.TProxyPreroutingMask) +) + var _ = framework.SerialDescribe("[group:pod]", func() { f := framework.NewDefaultFramework("vpc-pod-probe") @@ -175,7 +180,7 @@ var _ = framework.SerialDescribe("[group:pod]", func() { framework.ExpectTrue(found, "Pod readiness probe is expected to fail") pod = podClient.GetPod(podName) - checkTProxyRules(f, pod, port-1, false) + checkTProxyRules(f, pod, port-1, true) }) }) @@ -183,9 +188,6 @@ func checkTProxyRules(f *framework.Framework, pod *corev1.Pod, probePort int32, ginkgo.GinkgoHelper() nodeName := pod.Spec.NodeName - tProxyOutputMarkMask := fmt.Sprintf("%#x/%#x", util.TProxyOutputMark, util.TProxyOutputMask) - tProxyPreRoutingMarkMask := fmt.Sprintf("%#x/%#x", util.TProxyPreroutingMark, util.TProxyPreroutingMask) - isZeroIP := false if len(pod.Status.PodIPs) == 2 { isZeroIP = true