From c619d31ba1000d48f87552ccda1f98af6a433697 Mon Sep 17 00:00:00 2001 From: Francesco Cheinasso Date: Thu, 12 Oct 2023 19:29:18 +0200 Subject: [PATCH] Metric Agent: cache fix --- cmd/metric-agent/main.go | 42 ++++++++---- .../liqo-metric-agent-deployment.yaml | 2 + deployments/liqo/values.yaml | 2 + pkg/utils/clients/get_cached_client.go | 66 ++++++++----------- 4 files changed, 64 insertions(+), 48 deletions(-) diff --git a/cmd/metric-agent/main.go b/cmd/metric-agent/main.go index 43f65b99eb..7fa5510ad9 100644 --- a/cmd/metric-agent/main.go +++ b/cmd/metric-agent/main.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package main is the entrypoint of the metric-agent. package main import ( @@ -19,14 +20,13 @@ import ( "flag" "fmt" "net/http" + "os" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -49,6 +49,8 @@ func main() { keyPath := flag.String("key-path", "server.key", "Path to the key file") certPath := flag.String("cert-path", "server.crt", "Path to the certificate file") + readTimeout := flag.Duration("read-timeout", 0, "Read timeout") + writeTimeout := flag.Duration("write-timeout", 0, "Write timeout") port := flag.Int("port", 8443, "Port to listen on") klog.InitFlags(nil) @@ -61,16 +63,23 @@ func main() { kcl := kubernetes.NewForConfigOrDie(config) scheme := runtime.NewScheme() - _ = clientgoscheme.AddToScheme(scheme) + if err := corev1.AddToScheme(scheme); err != nil { + klog.Errorf("error adding client-go scheme: %s", err) + os.Exit(1) + } liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(config, nil) if err != nil { - klog.Fatalf("mapper: %s", err) + klog.Errorf("mapper: %s", err) + os.Exit(1) } podsLabelRequirement, err := labels.NewRequirement(consts.ManagedByLabelKey, selection.Equals, []string{consts.ManagedByShadowPodValue}) - utilruntime.Must(err) + if err != nil { + klog.Errorf("error creating label requirement: %s", err) + os.Exit(1) + } cacheOptions := &cache.Options{ Scheme: scheme, @@ -82,21 +91,32 @@ func main() { }, } if err != nil { - klog.Fatalf("error creating cache: %s", err) + klog.Errorf("error creating pod cache: %s", err) + os.Exit(1) } - cl, err := clientutils.GetCachedClientWithConfig(ctx, scheme, config, cacheOptions) + cl, err := clientutils.GetCachedClientWithConfig(ctx, scheme, liqoMapper, config, cacheOptions) if err != nil { - klog.Fatal(err) + klog.Errorf("error creating client: %s", err) + os.Exit(1) } router, err := remotemetrics.GetHTTPHandler(kcl.RESTClient(), cl) if err != nil { klog.Fatal(err) + os.Exit(1) } - err = http.ListenAndServeTLS(fmt.Sprintf(":%d", *port), *certPath, *keyPath, router) - if err != nil { - klog.Fatal("ListenAndServe: ", err) + server := http.Server{ + Addr: fmt.Sprintf(":%d", *port), + Handler: router, + ReadTimeout: *readTimeout, + WriteTimeout: *writeTimeout, + } + + klog.Infof("starting server on port %d", *port) + if err := server.ListenAndServeTLS(*certPath, *keyPath); err != nil { + klog.Fatal(err) + os.Exit(1) } } diff --git a/deployments/liqo/templates/liqo-metric-agent-deployment.yaml b/deployments/liqo/templates/liqo-metric-agent-deployment.yaml index 2606e286f2..328e3c0164 100644 --- a/deployments/liqo/templates/liqo-metric-agent-deployment.yaml +++ b/deployments/liqo/templates/liqo-metric-agent-deployment.yaml @@ -65,6 +65,8 @@ spec: args: - --key-path=/certs/key.pem - --cert-path=/certs/cert.pem + - --read-timeout={{ .Values.metricAgent.config.timeout }} + - --write-timeout={{ .Values.metricAgent.config.timeout }} resources: {{- toYaml .Values.metricAgent.pod.resources | nindent 12 }} volumeMounts: - mountPath: '/certs' diff --git a/deployments/liqo/values.yaml b/deployments/liqo/values.yaml index 908d90390a..d96dc55aa1 100644 --- a/deployments/liqo/values.yaml +++ b/deployments/liqo/values.yaml @@ -377,6 +377,8 @@ metricAgent: # (e.g., CPU, RAM, etc) collected on the nodes that are used by a remote cluster peered with you, then exporting # the resulting values as a property of the virtual kubelet running on the remote cluster. enable: true + config: + timeout: 30s pod: # -- Annotations for the metricAgent pod. annotations: {} diff --git a/pkg/utils/clients/get_cached_client.go b/pkg/utils/clients/get_cached_client.go index 00d72950be..d149125ba5 100644 --- a/pkg/utils/clients/get_cached_client.go +++ b/pkg/utils/clients/get_cached_client.go @@ -18,60 +18,52 @@ package clients import ( "context" "fmt" + "os" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/cluster" - - "github.com/liqotech/liqo/pkg/utils/mapper" ) -// GetCachedClient returns a controller runtime client with the cache initialized only for the resources added to -// the scheme. The necessary rest.Config is generated inside this function. -func GetCachedClient(ctx context.Context, scheme *runtime.Scheme) (client.Client, error) { - conf := ctrl.GetConfigOrDie() - if conf == nil { - err := fmt.Errorf("unable to get the config file") - klog.Error(err) - return nil, err - } - - return GetCachedClientWithConfig(ctx, scheme, conf, nil) -} - // GetCachedClientWithConfig returns a controller runtime client with the cache initialized only for the resources added to // the scheme. The necessary rest.Config is passed as third parameter, it must not be nil. func GetCachedClientWithConfig(ctx context.Context, - scheme *runtime.Scheme, conf *rest.Config, cacheOptions *cache.Options) (client.Client, error) { - if conf == nil { - err := fmt.Errorf("the rest.Config parameter is nil") - klog.Error(err) - return nil, err + scheme *runtime.Scheme, mapper meta.RESTMapper, conf *rest.Config, cacheOptions *cache.Options) (client.Client, error) { + podcache, err := cache.New(conf, *cacheOptions) + go func() { + klog.Info("starting pod cache") + if err := podcache.Start(ctx); err != nil { + klog.Errorf("error starting pod cache: %s", err) + os.Exit(1) + } + }() + + klog.Info("waiting for pod cache sync") + + if ok := podcache.WaitForCacheSync(ctx); !ok { + return nil, fmt.Errorf("unable to sync pod cache") } - liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(conf, nil) - if err != nil { - klog.Errorf("mapper: %s", err) - return nil, err + klog.Info("pod cache synced") + + if conf == nil { + return nil, fmt.Errorf("the rest.Config parameter is nil") } - c, err := cluster.New(conf, func(o *cluster.Options) { - o.Client = client.Options{Scheme: scheme, Mapper: liqoMapper} - if cacheOptions != nil { - o.Cache = *cacheOptions - } else { - o.Cache = cache.Options{Scheme: scheme, Mapper: liqoMapper} - } + cl, err := client.New(conf, client.Options{ + Scheme: scheme, + Mapper: mapper, + Cache: &client.CacheOptions{ + Reader: podcache, + }, }) + if err != nil { - klog.Errorf("unable to create the client: %s", err) - return nil, err + return nil, fmt.Errorf("unable to create the client: %w", err) } - newClient := c.GetClient() - return newClient, nil + return cl, nil }