Skip to content

Commit

Permalink
custom vpc pod support tcp http probe with tproxy method
Browse files Browse the repository at this point in the history
  • Loading branch information
changluyi committed Jul 10, 2023
1 parent f2d063a commit 95c2a80
Show file tree
Hide file tree
Showing 15 changed files with 993 additions and 17 deletions.
15 changes: 1 addition & 14 deletions cmd/daemon/cniserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"k8s.io/klog/v2"
"k8s.io/sample-controller/pkg/signals"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
"github.com/kubeovn/kube-ovn/pkg/daemon"
"github.com/kubeovn/kube-ovn/pkg/util"
Expand Down Expand Up @@ -96,19 +95,7 @@ func CmdMain() {
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

addr := "0.0.0.0"
if os.Getenv("ENABLE_BIND_LOCAL_IP") == "true" {
podIpsEnv := os.Getenv("POD_IPS")
podIps := strings.Split(podIpsEnv, ",")
// when pod in dual mode, golang can't support bind v4 and v6 address in the same time,
// so not support bind local ip when in dual mode
if len(podIps) == 1 {
addr = podIps[0]
if util.CheckProtocol(podIps[0]) == kubeovnv1.ProtocolIPv6 {
addr = fmt.Sprintf("[%s]", podIps[0])
}
}
}
addr := daemon.GetDefaultListenPort()

if config.EnableVerboseConnCheck {
go func() {
Expand Down
6 changes: 6 additions & 0 deletions dist/images/uninstall.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ iptables -t filter -D FORWARD -m set --match-set ovn40subnets src -j ACCEPT
iptables -t filter -D FORWARD -m set --match-set ovn40services dst -j ACCEPT
iptables -t filter -D FORWARD -m set --match-set ovn40services src -j ACCEPT
iptables -t filter -D OUTPUT -p udp -m udp --dport 6081 -j MARK --set-xmark 0x0
iptables -t mangle -D PREROUTING -m comment --comment "kube-ovn prerouting rules" -j OVN-PREROUTING
iptables -t mangle -D OUTPUT -m comment --comment "kube-ovn output rules" -j OVN-OUTPUT
iptables -t mangle -F OVN-PREROUTING
iptables -t mangle -X OVN-PREROUTING
iptables -t mangle -F OVN-OUTPUT
iptables -t mangle -X OVN-OUTPUT

sleep 1

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kubeovn/kube-ovn
go 1.20

require (
github.com/Asphaltt/go-tproxy v0.0.0-20210424070945-9f75cc83f122
github.com/Microsoft/go-winio v0.6.1
github.com/Microsoft/hcsshim v0.9.9
github.com/alauda/felix v3.6.6-0.20201207121355-187332daf314+incompatible
Expand Down Expand Up @@ -44,6 +45,7 @@ require (
k8s.io/apimachinery v0.27.3
k8s.io/apiserver v0.27.3
k8s.io/client-go v12.0.0+incompatible
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.100.1
k8s.io/kubectl v0.27.3
k8s.io/kubernetes v1.27.3
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f
cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y=
contrib.go.opencensus.io/exporter/ocagent v0.6.0/go.mod h1:zmKjrJcdo0aYcVS7bmEeSEBLPA9YJp5bjrofdU3pIXs=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Asphaltt/go-tproxy v0.0.0-20210424070945-9f75cc83f122 h1:M4Ko8IFCm5Rf0ARszPK0ATqxKEPzj3Sx53Cg8tZf1Ww=
github.com/Asphaltt/go-tproxy v0.0.0-20210424070945-9f75cc83f122/go.mod h1:1BDKyG3O1sOtCLRCWZVCSIuUckqksbJxXTggumT8Yhg=
github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4=
github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
Expand Down Expand Up @@ -2296,6 +2298,7 @@ k8s.io/gengo v0.0.0-20220902162205-c0856e24416d/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAE
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v0.4.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Configuration struct {
EnableVerboseConnCheck bool
TCPConnCheckPort int
UDPConnCheckPort int
EnableTProxy bool
}

// ParseFlags will parse cmd args then init kubeClient and configuration
Expand Down Expand Up @@ -100,6 +101,7 @@ func ParseFlags() *Configuration {
argEnableVerboseConnCheck = pflag.Bool("enable-verbose-conn-check", false, "enable TCP/UDP connectivity check listen port")
argTCPConnectivityCheckPort = pflag.Int("tcp-conn-check-port", 8100, "TCP connectivity Check Port")
argUDPConnectivityCheckPort = pflag.Int("udp-conn-check-port", 8101, "UDP connectivity Check Port")
argEnableTProxy = pflag.Bool("enable-tproxy", true, "enable tproxy for vpc pod liveness or readiness probe")
)

// mute info log for ipset lib
Expand Down Expand Up @@ -154,6 +156,7 @@ func ParseFlags() *Configuration {
EnableVerboseConnCheck: *argEnableVerboseConnCheck,
TCPConnCheckPort: *argTCPConnectivityCheckPort,
UDPConnCheckPort: *argUDPConnectivityCheckPort,
EnableTProxy: *argEnableTProxy,
}
return config
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,18 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}
}, 5*time.Minute, stopCh)

if c.config.EnableTProxy {
go c.StartTProxyForwarding(stopCh)
go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
// Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
// so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
// kubelet to tproxy, if probe success recover the iptable rules
go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)

} else {
c.cleanTProxyConfig()
}

<-stopCh
klog.Info("Shutting down workers")
}
Expand Down
170 changes: 170 additions & 0 deletions pkg/daemon/gateway_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ const (

const (
NAT = "nat"
MANGLE = "mangle"
Prerouting = "PREROUTING"
Postrouting = "POSTROUTING"
Output = "OUTPUT"
OvnPrerouting = "OVN-PREROUTING"
OvnPostrouting = "OVN-POSTROUTING"
OvnOutput = "OVN-OUTPUT"
OvnMasquerade = "OVN-MASQUERADE"
OvnNatOutGoingPolicy = "OVN-NAT-POLICY"
OvnNatOutGoingPolicySubnet = "OVN-NAT-PSUBNET-"
Expand All @@ -51,6 +54,10 @@ const (
const (
OnOutGoingNatMark = "0x90001/0x90001"
OnOutGoingForwardMark = "0x90002/0x90002"
TProxyPostroutingMark = 0x90003
TProxyPostroutingMask = 0x90003
TProxyPreroutingMark = 0x90004
TProxyPreroutingMask = 0x90004
)

type policyRouteMeta struct {
Expand Down Expand Up @@ -736,6 +743,10 @@ func (c *Controller) setIptables() error {
return err
}

if err = c.reconcileTProxyIPTableRules(protocol); err != nil {
return err
}

if err = c.updateIptablesChain(ipt, NAT, OvnPrerouting, Prerouting, natPreroutingRules); err != nil {
klog.Errorf("failed to update chain %s/%s: %v", NAT, OvnPrerouting)
return err
Expand All @@ -757,6 +768,150 @@ func (c *Controller) setIptables() error {
return nil
}

func (c *Controller) reconcileTProxyIPTableRules(protocol string) error {
if !c.config.EnableTProxy {
return nil
}

ipt := c.iptables[protocol]
tproxyPreRoutingRules := make([]util.IPTableRule, 0)
tproxyOutputRules := make([]util.IPTableRule, 0)
var probePorts, podNames []string

pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("list pods failed, %v", err)
return err
}

for _, pod := range pods {
podNames = append(podNames, pod.Namespace+"/"+pod.Name)
}

sort.Strings(podNames)

for _, podName := range podNames {
items := strings.Split(podName, "/")
nsName := items[0]
name := items[1]
pod, err := c.podsLister.Pods(nsName).Get(name)
if err != nil {
klog.Errorf("get pod %s/%s failed, %v", nsName, name, err)
return err
}

subnetName, ok := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, util.OvnProvider)]
if !ok {
continue
}

podIP, ok := pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, util.OvnProvider)]
if !ok {
continue
}

hostIP := pod.Status.HostIP

subnet, err := c.subnetsLister.Get(subnetName)
if err != nil {
err = fmt.Errorf("failed to get subnet '%s', err: %v", subnetName, err)
continue
}

if subnet.Spec.Vpc == c.config.ClusterRouter {
continue
}

for _, container := range pod.Spec.Containers {
if container.ReadinessProbe != nil {
if httpGet := container.ReadinessProbe.HTTPGet; httpGet != nil {
if port := httpGet.Port.String(); port != "" {
probePorts = append(probePorts, port)
}
}

if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
probePorts = append(probePorts, port)
}
}
}

if container.LivenessProbe != nil {
if httpGet := container.LivenessProbe.HTTPGet; httpGet != nil {
if port := httpGet.Port.String(); port != "" {
probePorts = append(probePorts, port)
}
}

if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
probePorts = append(probePorts, port)
}
}
}
}

if len(probePorts) == 0 {
continue
}

probePorts = formatProbePorts(probePorts)
for _, probePort := range probePorts {
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, probePort)); ok {
if !isTCPProbePortReachable.(bool) {
continue
}
}

tProxyPostroutingMarkMask := fmt.Sprintf("%#x/%#x", TProxyPostroutingMark, TProxyPostroutingMask)
tProxyPreRoutingMarkMask := fmt.Sprintf("%#x/%#x", TProxyPreroutingMark, TProxyPreroutingMask)
if protocol == kubeovnv1.ProtocolIPv4 {
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/32 -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, probePort, tProxyPostroutingMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/32 -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))})
}
if protocol == kubeovnv1.ProtocolIPv6 {
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/128 -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, probePort, tProxyPostroutingMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/128 -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))})
}
}
}

if err := c.updateIptablesChain(ipt, MANGLE, OvnPrerouting, Prerouting, tproxyPreRoutingRules); err != nil {
klog.Errorf("failed to update chain %s with rules %v: %v", OvnPrerouting, tproxyPreRoutingRules, err)
return err
}

if err := c.updateIptablesChain(ipt, MANGLE, OvnOutput, Output, tproxyOutputRules); err != nil {
klog.Errorf("failed to update chain %s with rules %v: %v", OvnOutput, tproxyOutputRules, err)
return err
}
return nil
}

func (c *Controller) cleanTProxyIPTableRules(protocol string) {
ipt := c.iptables[protocol]
for _, chain := range []string{OvnPrerouting, OvnOutput} {
rules, err := ipt.List(MANGLE, chain)
if err != nil {
klog.Errorf("failed to list iptables rules in table %v chain %v, %+v", MANGLE, chain, err)
return
}
for _, rule := range rules {
if !strings.Contains(rule, fmt.Sprintf("%#x", TProxyPostroutingMark)) &&
!strings.Contains(rule, fmt.Sprintf("%#x", TProxyPreroutingMark)) {
continue
}
rule := rule[4+len(chain):]
spec := util.DoubleQuotedFields(rule)
if err = ipt.Delete(MANGLE, chain, spec...); err != nil {
klog.Errorf(`failed to delete iptables rule "%s": %v`, rule, err)
return
}
}
}
}

func (c *Controller) reconcileNatOutgoingPolicyIptablesChain(protocol string) error {
ipt := c.iptables[protocol]

Expand Down Expand Up @@ -1533,3 +1688,18 @@ func getNatPolicySubnetChainUID(chainName string) string {
func formatIPsetUnPrefix(ipsetName string) string {
return ipsetName[len("ovn40"):]
}

func formatProbePorts(probePorts []string) []string {
// Deduplicate and sort
retProbePorts := make([]string, 0, len(probePorts))
portMap := make(map[string]interface{})
for _, port := range probePorts {
if _, exist := portMap[port]; !exist {
retProbePorts = append(retProbePorts, port)
portMap[port] = nil
}
}

sort.Strings(retProbePorts)
return retProbePorts
}
22 changes: 22 additions & 0 deletions pkg/daemon/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
package daemon

import (
"fmt"
"net"
"os"
"strings"

"k8s.io/klog/v2"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
)

func listen(socket string) (net.Listener, func(), error) {
Expand All @@ -23,3 +28,20 @@ func listen(socket string) (net.Listener, func(), error) {
}
}, nil
}

func GetDefaultListenPort() string {
addr := "0.0.0.0"
if os.Getenv("ENABLE_BIND_LOCAL_IP") == "true" {
podIpsEnv := os.Getenv("POD_IPS")
podIps := strings.Split(podIpsEnv, ",")
// when pod in dual mode, golang can't support bind v4 and v6 address in the same time,
// so not support bind local ip when in dual mode
if len(podIps) == 1 {
addr = podIps[0]
if util.CheckProtocol(podIps[0]) == kubeovnv1.ProtocolIPv6 {
addr = fmt.Sprintf("[%s]", podIps[0])
}
}
}
return addr
}
Loading

0 comments on commit 95c2a80

Please sign in to comment.