Skip to content

Commit

Permalink
Merge pull request #293 from GoogleCloudPlatform/pod-informer
Browse files Browse the repository at this point in the history
use Pod informer to avoid Pod GET API calls
  • Loading branch information
songjiaxun committed Jul 5, 2024
2 parents 1dfe440 + 497087a commit 2a62228
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 22 deletions.
23 changes: 13 additions & 10 deletions cmd/csi_driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ import (
)

var (
endpoint = flag.String("endpoint", "unix:/tmp/csi.sock", "CSI endpoint")
nodeID = flag.String("nodeid", "", "node id")
runController = flag.Bool("controller", false, "run controller service")
runNode = flag.Bool("node", false, "run node service")
kubeconfigPath = flag.String("kubeconfig-path", "", "The kubeconfig path.")
identityPool = flag.String("identity-pool", "", "The Identity Pool to authenticate with GCS API.")
identityProvider = flag.String("identity-provider", "", "The Identity Provider to authenticate with GCS API.")
enableProfiling = flag.Bool("enable-profiling", false, "enable the golang pprof at port 6060")
endpoint = flag.String("endpoint", "unix:/tmp/csi.sock", "CSI endpoint")
nodeID = flag.String("nodeid", "", "node id")
runController = flag.Bool("controller", false, "run controller service")
runNode = flag.Bool("node", false, "run node service")
kubeconfigPath = flag.String("kubeconfig-path", "", "The kubeconfig path.")
identityPool = flag.String("identity-pool", "", "The Identity Pool to authenticate with GCS API.")
identityProvider = flag.String("identity-provider", "", "The Identity Provider to authenticate with GCS API.")
enableProfiling = flag.Bool("enable-profiling", false, "enable the golang pprof at port 6060")
informerResyncDurationSec = flag.Int("informer-resync-duration-sec", 1800, "informer resync duration in seconds")

// These are set at compile time.
version = "unknown"
Expand Down Expand Up @@ -73,9 +74,9 @@ func main() {
}()
}

clientset, err := clientset.New(*kubeconfigPath)
clientset, err := clientset.New(*kubeconfigPath, *informerResyncDurationSec)
if err != nil {
klog.Fatal("Failed to configure k8s client")
klog.Fatalf("Failed to configure k8s client: %v", err)
}

meta, err := metadata.NewMetadataService(*identityPool, *identityProvider)
Expand All @@ -95,6 +96,8 @@ func main() {
klog.Fatalf("NodeID cannot be empty for node service")
}

clientset.ConfigurePodLister(*nodeID)

mounter, err = csimounter.New("")
if err != nil {
klog.Fatalf("Failed to prepare CSI mounter: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion deploy/base/node/node_setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ metadata:
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get"]
verbs: ["get", "watch", "list"]
- apiGroups: [""]
resources: ["serviceaccounts"]
verbs: ["get"]
Expand Down
44 changes: 36 additions & 8 deletions pkg/cloud_provider/clientset/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@ package clientset

import (
"context"
"errors"
"fmt"
"time"

authenticationv1 "k8s.io/api/authentication/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)

type Interface interface {
GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error)
ConfigurePodLister(nodeName string)
GetPod(namespace, name string) (*corev1.Pod, error)
CreateServiceAccountToken(ctx context.Context, namespace, name string, tokenRequest *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error)
GetGCPServiceAccountName(ctx context.Context, namespace, name string) (string, error)
}
Expand All @@ -42,10 +47,12 @@ type PodInfo struct {
}

type Clientset struct {
k8sClients kubernetes.Interface
k8sClients kubernetes.Interface
podLister listersv1.PodLister
informerResyncDurationSec int
}

func New(kubeconfigPath string) (Interface, error) {
func New(kubeconfigPath string, informerResyncDurationSec int) (Interface, error) {
var err error
var rc *rest.Config
if kubeconfigPath != "" {
Expand All @@ -56,19 +63,40 @@ func New(kubeconfigPath string) (Interface, error) {
rc, err = rest.InClusterConfig()
}
if err != nil {
klog.Fatalf("Failed to read kubeconfig: %v", err)
return nil, fmt.Errorf("failed to read kubeconfig: %w", err)
}

clientset, err := kubernetes.NewForConfig(rc)
if err != nil {
klog.Fatal("failed to configure k8s client")
return nil, fmt.Errorf("failed to configure k8s client: %w", err)
}

return &Clientset{k8sClients: clientset}, nil
return &Clientset{k8sClients: clientset, informerResyncDurationSec: informerResyncDurationSec}, nil
}

func (c *Clientset) GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) {
return c.k8sClients.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
func (c *Clientset) ConfigurePodLister(nodeName string) {
informerFactory := informers.NewSharedInformerFactoryWithOptions(
c.k8sClients,
time.Duration(c.informerResyncDurationSec)*time.Second,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = "spec.nodeName=" + nodeName
}),
)
podLister := informerFactory.Core().V1().Pods().Lister()

ctx := context.Background()
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())

c.podLister = podLister
}

func (c *Clientset) GetPod(namespace, name string) (*corev1.Pod, error) {
if c.podLister == nil {
return nil, errors.New("pod informer is not ready")
}

return c.podLister.Pods(namespace).Get(name)
}

func (c *Clientset) CreateServiceAccountToken(ctx context.Context, namespace, name string, tokenRequest *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/cloud_provider/clientset/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (

type FakeClientset struct{}

func (c *FakeClientset) GetPod(_ context.Context, namespace, name string) (*corev1.Pod, error) {
func (c *FakeClientset) ConfigurePodLister(_ string) {}

func (c *FakeClientset) GetPod(namespace, name string) (*corev1.Pod, error) {
config := webhook.FakeConfig()
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down
2 changes: 1 addition & 1 deletion pkg/csi_driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublish
}

// Check if the sidecar container was injected into the Pod
pod, err := s.k8sClients.GetPod(ctx, vc[VolumeContextKeyPodNamespace], vc[VolumeContextKeyPodName])
pod, err := s.k8sClients.GetPod(vc[VolumeContextKeyPodNamespace], vc[VolumeContextKeyPodName])
if err != nil {
return nil, status.Errorf(codes.NotFound, "failed to get pod: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var _ = func() bool {
flag.Parse()
framework.AfterReadingAllFlags(&framework.TestContext)

c, err = clientset.New(framework.TestContext.KubeConfig)
c, err = clientset.New(framework.TestContext.KubeConfig, 0)
if err != nil {
klog.Fatalf("Failed to configure k8s client: %v", err)
}
Expand Down

0 comments on commit 2a62228

Please sign in to comment.