Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tproxy: support named port #4487

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/daemon/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (c *Controller) getTProxyConditionPod(needSort bool) ([]*v1.Pod, error) {
}

for _, pod := range pods {
if pod.Spec.NodeName != c.config.NodeName {
if pod.Spec.HostNetwork || pod.Spec.NodeName != c.config.NodeName {
continue
}

Expand All @@ -271,7 +271,7 @@ func (c *Controller) getTProxyConditionPod(needSort bool) ([]*v1.Pod, error) {

subnet, err := c.subnetsLister.Get(subnetName)
if err != nil {
return nil, fmt.Errorf("failed to get subnet '%s', err: %w", subnetName, err)
return nil, fmt.Errorf("failed to get subnet %q: %w", subnetName, err)
}

if subnet.Spec.Vpc == c.config.ClusterRouter {
Expand Down
58 changes: 10 additions & 48 deletions pkg/daemon/gateway_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ const (
TProxyPreroutingMask = util.TProxyPreroutingMask
)

var (
tProxyOutputMarkMask = fmt.Sprintf("%#x/%#x", TProxyOutputMark, TProxyOutputMask)
tProxyPreRoutingMarkMask = fmt.Sprintf("%#x/%#x", TProxyPreroutingMark, TProxyPreroutingMask)
)

type policyRouteMeta struct {
family int
source string
Expand Down Expand Up @@ -856,7 +861,6 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) e
ipt := c.iptables[protocol]
tproxyPreRoutingRules := make([]util.IPTableRule, 0)
tproxyOutputRules := make([]util.IPTableRule, 0)
probePorts := strset.New()

pods, err := c.getTProxyConditionPod(true)
if err != nil {
Expand All @@ -877,54 +881,12 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) e
continue
}

for _, container := range pod.Spec.Containers {
if container.ReadinessProbe != nil {
if httpGet := container.ReadinessProbe.HTTPGet; httpGet != nil {
if port := httpGet.Port.String(); port != "" {
probePorts.Add(port)
}
}

if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok {
if isTCPProbePortReachable.(bool) {
probePorts.Add(port)
}
}
}
}
}

if container.LivenessProbe != nil {
if httpGet := container.LivenessProbe.HTTPGet; httpGet != nil {
if port := httpGet.Port.String(); port != "" {
probePorts.Add(port)
}
}

if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok {
if isTCPProbePortReachable.(bool) {
probePorts.Add(port)
}
}
}
}
}
}

if probePorts.IsEmpty() {
ports := getProbePorts(pod)
if ports.Len() == 0 {
continue
}

probePortList := probePorts.List()
sort.Strings(probePortList)
for _, probePort := range probePortList {
tProxyOutputMarkMask := fmt.Sprintf("%#x/%#x", TProxyOutputMark, TProxyOutputMask)
tProxyPreRoutingMarkMask := fmt.Sprintf("%#x/%#x", TProxyPreroutingMark, TProxyPreroutingMask)

for _, probePort := range ports.SortedList() {
hostIP := pod.Status.HostIP
prefixLen := 32
if protocol == kubeovnv1.ProtocolIPv6 {
Expand All @@ -938,8 +900,8 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) e
hostIP = "::"
}
}
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, prefixLen, probePort, tProxyOutputMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, prefixLen, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))})
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %d -j MARK --set-xmark %s`, podIP, prefixLen, probePort, tProxyOutputMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %d -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, prefixLen, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))})
}
}

Expand Down
102 changes: 61 additions & 41 deletions pkg/daemon/tproxy_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"syscall"

"github.com/containernetworking/plugins/pkg/ns"
"github.com/scylladb/go-set/strset"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"k8s.io/utils/set"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ovs"
Expand Down Expand Up @@ -56,9 +58,43 @@ func (c *Controller) StartTProxyForwarding() {
}
}

func (c *Controller) StartTProxyTCPPortProbe() {
probePorts := strset.New()
func getProbePorts(pod *corev1.Pod) set.Set[int32] {
ports := set.New[int32]()
for _, container := range pod.Spec.Containers {
for _, probe := range [...]*corev1.Probe{container.LivenessProbe, container.ReadinessProbe} {
if probe == nil {
continue
}
var port intstr.IntOrString
switch {
case probe.TCPSocket != nil:
port = probe.TCPSocket.Port
case probe.HTTPGet != nil:
port = probe.HTTPGet.Port
case probe.GRPC != nil:
port = intstr.FromInt32(probe.GRPC.Port)
default:
continue
}
if port.Type == intstr.Int {
ports.Insert(port.IntVal)
continue
}
for _, p := range container.Ports {
if p.Name == port.StrVal {
ports.Insert(p.ContainerPort)
break
}
}
}
}

ports.Delete(0)
klog.Infof("probe ports for pod %s/%s: %v", pod.Namespace, pod.Name, ports.SortedList())
return ports
}

func (c *Controller) StartTProxyTCPPortProbe() {
pods, err := c.getTProxyConditionPod(false)
if err != nil {
return
Expand All @@ -67,33 +103,19 @@ func (c *Controller) StartTProxyTCPPortProbe() {
for _, pod := range pods {
iface := ovs.PodNameToPortName(pod.Name, pod.Namespace, util.OvnProvider)
nsName, err := ovs.GetInterfacePodNs(iface)
if err != nil || nsName == "" {
klog.Infof("iface %s's namespace not found", iface)
if err != nil {
klog.Errorf("failed to get netns for pod %s/%s: %v", pod.Namespace, pod.Name, err)
continue
}
if nsName == "" {
klog.Infof("netns for pod %s/%s not found", pod.Namespace, pod.Name)
continue
}

ports := getProbePorts(pod)
for _, podIP := range pod.Status.PodIPs {
customVPCPodIPToNs.Store(podIP.IP, nsName)
for _, container := range pod.Spec.Containers {
if container.ReadinessProbe != nil {
if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
probePorts.Add(port)
}
}
}

if container.LivenessProbe != nil {
if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
probePorts.Add(port)
}
}
}
}

probePortsList := probePorts.List()
for _, port := range probePortsList {
for _, port := range ports.UnsortedList() {
probePortInNs(podIP.IP, port, true, nil)
}
}
Expand Down Expand Up @@ -264,7 +286,7 @@ func delRouteIfExist(family, table int, dst *net.IPNet) error {
}

func handleRedirectFlow(conn net.Conn) {
klog.V(5).Infof("Accepting TCP connection from %v with destination of %v", conn.RemoteAddr().String(), conn.LocalAddr().String())
klog.V(5).Infof("accepting TCP connection from %s to %s", conn.RemoteAddr(), conn.LocalAddr())
defer func() {
if err := conn.Close(); err != nil {
klog.Errorf("conn Close err: %v", err)
Expand All @@ -278,42 +300,44 @@ func handleRedirectFlow(conn net.Conn) {
return
}

probePortInNs(podIP, probePort, false, conn)
port, err := strconv.ParseInt(probePort, 10, 32)
if err != nil {
klog.Errorf("failed to parse port number %q: %v", probePort, err)
return
}

probePortInNs(podIP, int32(port), false, conn) // #nosec G115
}

func probePortInNs(podIP, probePort string, isTProxyProbe bool, conn net.Conn) {
func probePortInNs(podIP string, probePort int32, isTProxyProbe bool, conn net.Conn) {
podNs, ok := customVPCPodIPToNs.Load(podIP)
if !ok {
return
}

iprobePort, err := strconv.Atoi(probePort)
if err != nil {
klog.V(3).Infof("failed to get netns for pod with ip %s", podIP)
return
}

podNS, err := ns.GetNS(podNs.(string))
if err != nil {
customVPCPodIPToNs.Delete(podIP)
klog.Infof("ns %s already deleted", podNs)
klog.V(3).Infof("netns %s not found", podNs)
return
}

_ = ns.WithNetNSPath(podNS.Path(), func(_ ns.NetNS) error {
// Packet's src and dst IP are both PodIP in netns
localpodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP)}
remotepodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP), Port: iprobePort}
remotepodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP), Port: int(probePort)}

remoteConn, err := goTProxy.DialTCP(&localpodTCPAddr, &remotepodTCPAddr, !isTProxyProbe)
if err != nil {
if isTProxyProbe {
customVPCPodTCPProbeIPPort.Store(getIPPortString(podIP, probePort), false)
customVPCPodTCPProbeIPPort.Store(util.JoinHostPort(podIP, probePort), false)
}
return nil
}

if isTProxyProbe {
customVPCPodTCPProbeIPPort.Store(getIPPortString(podIP, probePort), true)
customVPCPodTCPProbeIPPort.Store(util.JoinHostPort(podIP, probePort), true)
return nil
}

Expand Down Expand Up @@ -342,10 +366,6 @@ func probePortInNs(podIP, probePort string, isTProxyProbe bool, conn net.Conn) {
})
}

func getIPPortString(podIP, port string) string {
return fmt.Sprintf("%s|%s", podIP, port)
}

func getProtocols(protocol string) []string {
var protocols []string
if protocol == kubeovnv1.ProtocolDual {
Expand Down
10 changes: 6 additions & 4 deletions test/e2e/kube-ovn/pod/vpc_pod_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
"github.com/kubeovn/kube-ovn/test/e2e/framework/iptables"
)

var (
tProxyOutputMarkMask = fmt.Sprintf("%#x/%#x", util.TProxyOutputMark, util.TProxyOutputMask)
tProxyPreRoutingMarkMask = fmt.Sprintf("%#x/%#x", util.TProxyPreroutingMark, util.TProxyPreroutingMask)
)

var _ = framework.SerialDescribe("[group:pod]", func() {
f := framework.NewDefaultFramework("vpc-pod-probe")

Expand Down Expand Up @@ -175,17 +180,14 @@ var _ = framework.SerialDescribe("[group:pod]", func() {
framework.ExpectTrue(found, "Pod readiness probe is expected to fail")

pod = podClient.GetPod(podName)
checkTProxyRules(f, pod, port-1, false)
checkTProxyRules(f, pod, port-1, true)
})
})

func checkTProxyRules(f *framework.Framework, pod *corev1.Pod, probePort int32, exist bool) {
ginkgo.GinkgoHelper()

nodeName := pod.Spec.NodeName
tProxyOutputMarkMask := fmt.Sprintf("%#x/%#x", util.TProxyOutputMark, util.TProxyOutputMask)
tProxyPreRoutingMarkMask := fmt.Sprintf("%#x/%#x", util.TProxyPreroutingMark, util.TProxyPreroutingMask)

isZeroIP := false
if len(pod.Status.PodIPs) == 2 {
isZeroIP = true
Expand Down
Loading