Skip to content

Commit

Permalink
support tcpCheck in podProbe
Browse files Browse the repository at this point in the history
  • Loading branch information
BH4AWS committed Dec 20, 2023
1 parent fa9a9a0 commit 900f3cd
Show file tree
Hide file tree
Showing 14 changed files with 493 additions and 74 deletions.
2 changes: 2 additions & 0 deletions apis/apps/v1alpha1/node_pod_probe_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type PodProbe struct {
Namespace string `json:"namespace"`
// pod uid
UID string `json:"uid"`
// pod ip
IP string `json:"IP"`
// Custom container probe, supports Exec, Tcp, and returns the result to Pod yaml
Probes []ContainerProbe `json:"probes,omitempty"`
}
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/apps.kruise.io_nodepodprobes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ spec:
podProbes:
items:
properties:
IP:
description: pod ip
type: string
name:
description: pod name
type: string
Expand Down Expand Up @@ -221,6 +224,7 @@ spec:
description: pod uid
type: string
required:
- IP
- name
- namespace
- uid
Expand Down
22 changes: 13 additions & 9 deletions pkg/controller/podprobemarker/pod_probe_marker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@ import (
"reflect"
"strings"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
utildiscovery "github.com/openkruise/kruise/pkg/util/discovery"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/ratelimiter"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -44,6 +36,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
utildiscovery "github.com/openkruise/kruise/pkg/util/discovery"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/ratelimiter"
)

func init() {
Expand Down Expand Up @@ -234,13 +235,16 @@ func (r *ReconcilePodProbeMarker) updateNodePodProbes(ppm *appsv1alpha1.PodProbe
exist = true
for j := range ppm.Spec.Probes {
probe := ppm.Spec.Probes[j]
if podProbe.IP == "" {
podProbe.IP = pod.Status.PodIP
}
setPodContainerProbes(podProbe, probe, ppm.Name)
}
break
}
}
if !exist {
podProbe := appsv1alpha1.PodProbe{Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID)}
podProbe := appsv1alpha1.PodProbe{Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID), IP: pod.Status.PodIP}
for j := range ppm.Spec.Probes {
probe := ppm.Spec.Probes[j]
podProbe.Probes = append(podProbe.Probes, appsv1alpha1.ContainerProbe{
Expand Down
11 changes: 6 additions & 5 deletions pkg/controller/podprobemarker/podprobemarker_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ package podprobemarker
import (
"context"

appsalphav1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -33,6 +30,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

appsalphav1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
)

var _ handler.EventHandler = &enqueueRequestForPodProbeMarker{}
Expand Down Expand Up @@ -95,8 +96,8 @@ func (p *enqueueRequestForPod) Update(evt event.UpdateEvent, q workqueue.RateLim
if newInitialCondition == nil {
return
}
if kubecontroller.IsPodActive(new) && (oldInitialCondition == nil || oldInitialCondition.Status == corev1.ConditionFalse) &&
newInitialCondition.Status == corev1.ConditionTrue {
if kubecontroller.IsPodActive(new) && (((oldInitialCondition == nil || oldInitialCondition.Status == corev1.ConditionFalse) &&
newInitialCondition.Status == corev1.ConditionTrue) || (old.Status.PodIP != new.Status.PodIP)) {
ppms, err := p.getPodProbeMarkerForPod(new)
if err != nil {
klog.Errorf("List PodProbeMarker fialed: %s", err.Error())
Expand Down
22 changes: 12 additions & 10 deletions pkg/daemon/podprobe/pod_probe_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ import (
"sync"
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/client"
kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned"
clientalpha1 "github.com/openkruise/kruise/pkg/client/clientset/versioned/typed/apps/v1alpha1"
listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1"
daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime"
daemonoptions "github.com/openkruise/kruise/pkg/daemon/options"
"github.com/openkruise/kruise/pkg/daemon/util"
commonutil "github.com/openkruise/kruise/pkg/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -51,6 +42,16 @@ import (
"k8s.io/gengo/examples/set-gen/sets"
"k8s.io/klog/v2"
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/client"
kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned"
clientalpha1 "github.com/openkruise/kruise/pkg/client/clientset/versioned/typed/apps/v1alpha1"
listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1"
daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime"
daemonoptions "github.com/openkruise/kruise/pkg/daemon/options"
"github.com/openkruise/kruise/pkg/daemon/util"
commonutil "github.com/openkruise/kruise/pkg/util"
)

const (
Expand All @@ -63,6 +64,7 @@ type probeKey struct {
podNs string
podName string
podUID string
podIP string
containerName string
probeName string
}
Expand Down Expand Up @@ -249,7 +251,7 @@ func (c *Controller) sync() error {
c.workerLock.Lock()
validWorkers := map[probeKey]struct{}{}
for _, podProbe := range npp.Spec.PodProbes {
key := probeKey{podNs: podProbe.Namespace, podName: podProbe.Name, podUID: podProbe.UID}
key := probeKey{podNs: podProbe.Namespace, podName: podProbe.Name, podUID: podProbe.UID, podIP: podProbe.IP}
for i := range podProbe.Probes {
probe := podProbe.Probes[i]
key.containerName = probe.ContainerName
Expand Down
87 changes: 70 additions & 17 deletions pkg/daemon/podprobe/pod_probe_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ import (
"testing"
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/client/clientset/versioned/fake"
listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1"
commonutil "github.com/openkruise/kruise/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/client/clientset/versioned/fake"
listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1"
commonutil "github.com/openkruise/kruise/pkg/util"
)

var (
Expand All @@ -44,6 +46,7 @@ var (
{
Name: "pod-0",
UID: "pod-0-uid",
IP: "1.1.1.1",
Probes: []appsv1alpha1.ContainerProbe{
{
Name: "ppm-1#healthy",
Expand All @@ -64,6 +67,7 @@ var (
{
Name: "pod-1",
UID: "pod-1-uid",
IP: "2.2.2.2",
Probes: []appsv1alpha1.ContainerProbe{
{
Name: "ppm-1#healthy",
Expand All @@ -81,6 +85,27 @@ var (
},
},
},
{
Name: "pod-2",
UID: "pod-2-uid",
IP: "3.3.3.3",
Probes: []appsv1alpha1.ContainerProbe{
{
Name: "ppm-1#tcpCheck",
ContainerName: "main",
Probe: appsv1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(8000)},
Host: "3.3.3.3",
},
},
},
},
},
},
},
},
},
}
Expand All @@ -96,7 +121,7 @@ func TestUpdateNodePodProbeStatus(t *testing.T) {
{
name: "test1, update pod probe status",
getUpdate: func() Update {
return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded}
return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded}
},
getNodePodProbe: func() *appsv1alpha1.NodePodProbe {
demo := demoNodePodProbe.DeepCopy()
Expand Down Expand Up @@ -144,10 +169,11 @@ func TestUpdateNodePodProbeStatus(t *testing.T) {
return obj
},
},

{
name: "test2, update pod probe status",
getUpdate: func() Update {
return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded}
return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded}
},
getNodePodProbe: func() *appsv1alpha1.NodePodProbe {
demo := demoNodePodProbe.DeepCopy()
Expand Down Expand Up @@ -227,10 +253,11 @@ func TestUpdateNodePodProbeStatus(t *testing.T) {
return obj
},
},

{
name: "test3, update pod probe status",
getUpdate: func() Update {
return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded}
return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded}
},
getNodePodProbe: func() *appsv1alpha1.NodePodProbe {
demo := demoNodePodProbe.DeepCopy()
Expand Down Expand Up @@ -286,6 +313,7 @@ func TestUpdateNodePodProbeStatus(t *testing.T) {
return obj
},
},

{
name: "test4, update pod probe status",
getUpdate: func() Update {
Expand Down Expand Up @@ -412,7 +440,7 @@ func TestSyncNodePodProbe(t *testing.T) {
},
setWorkers: func(c *Controller) {
c.workers = map[probeKey]*worker{}
key1 := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#check"}
key1 := probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#check"}
c.workers[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Expand All @@ -423,7 +451,7 @@ func TestSyncNodePodProbe(t *testing.T) {
},
})
go c.workers[key1].run()
key2 := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}
key2 := probeKey{"", "pod-2", "pod-2-uid", "3.3.3.3", "main", "ppm-1#tcpCheck"}
c.workers[key2] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Expand All @@ -437,8 +465,8 @@ func TestSyncNodePodProbe(t *testing.T) {
},
expectWorkers: func(c *Controller) map[probeKey]*worker {
expect := map[probeKey]*worker{}
key := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}
expect[key] = newWorker(c, key, &appsv1alpha1.ContainerProbeSpec{
key1 := probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"}
expect[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Expand All @@ -448,6 +476,17 @@ func TestSyncNodePodProbe(t *testing.T) {
InitialDelaySeconds: 100,
},
})
key2 := probeKey{"", "pod-2", "pod-2-uid", "3.3.3.3", "main", "ppm-1#tcpCheck"}
expect[key2] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(8000)},
Host: "3.3.3.3",
},
},
},
})
return expect
},
},
Expand Down Expand Up @@ -476,7 +515,19 @@ func TestSyncNodePodProbe(t *testing.T) {
},
expectWorkers: func(c *Controller) map[probeKey]*worker {
expect := map[probeKey]*worker{}
key1 := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}
key0 := probeKey{"", "pod-0", "pod-0-uid", "1.1.1.1", "main", "ppm-1#healthy"}
expect[key0] = newWorker(c, key0, &appsv1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", "/healthy.sh"},
},
},
InitialDelaySeconds: 100,
},
})

key1 := probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"}
expect[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Expand All @@ -487,7 +538,8 @@ func TestSyncNodePodProbe(t *testing.T) {
InitialDelaySeconds: 100,
},
})
key2 := probeKey{"", "pod-1", "pod-1-uid", "nginx", "ppm-1#check"}

key2 := probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "nginx", "ppm-1#check"}
expect[key2] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Expand All @@ -498,15 +550,16 @@ func TestSyncNodePodProbe(t *testing.T) {
InitialDelaySeconds: 100,
},
})
key3 := probeKey{"", "pod-0", "pod-0-uid", "main", "ppm-1#healthy"}

key3 := probeKey{"", "pod-2", "pod-2-uid", "3.3.3.3", "main", "ppm-1#tcpCheck"}
expect[key3] = newWorker(c, key3, &appsv1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", "/healthy.sh"},
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(8000)},
Host: "3.3.3.3",
},
},
InitialDelaySeconds: 100,
},
})
return expect
Expand Down
Loading

0 comments on commit 900f3cd

Please sign in to comment.