From 7f9f816043bd127ec0d0440851c627259565840d Mon Sep 17 00:00:00 2001 From: Francesco Cheinasso Date: Thu, 12 Oct 2023 19:29:18 +0200 Subject: [PATCH] Metric Agent: cache fix --- cmd/metric-agent/main.go | 44 ++++++++---- deployments/liqo/README.md | 1 + .../liqo-metric-agent-deployment.yaml | 2 + deployments/liqo/values.yaml | 5 ++ pkg/utils/clients/get_cached_client.go | 72 +++++++++---------- 5 files changed, 74 insertions(+), 50 deletions(-) diff --git a/cmd/metric-agent/main.go b/cmd/metric-agent/main.go index 43f65b99eb..27469fda34 100644 --- a/cmd/metric-agent/main.go +++ b/cmd/metric-agent/main.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package main is the entrypoint of the metric-agent. package main import ( @@ -19,14 +20,13 @@ import ( "flag" "fmt" "net/http" + "os" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -49,6 +49,8 @@ func main() { keyPath := flag.String("key-path", "server.key", "Path to the key file") certPath := flag.String("cert-path", "server.crt", "Path to the certificate file") + readTimeout := flag.Duration("read-timeout", 0, "Read timeout") + writeTimeout := flag.Duration("write-timeout", 0, "Write timeout") port := flag.Int("port", 8443, "Port to listen on") klog.InitFlags(nil) @@ -61,16 +63,23 @@ func main() { kcl := kubernetes.NewForConfigOrDie(config) scheme := runtime.NewScheme() - _ = clientgoscheme.AddToScheme(scheme) + if err := corev1.AddToScheme(scheme); err != nil { + klog.Errorf("error adding client-go scheme: %s", err) + os.Exit(1) + } liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(config, nil) if err != nil { - klog.Fatalf("mapper: %s", err) + klog.Errorf("mapper: %s", err) + os.Exit(1) } podsLabelRequirement, err := labels.NewRequirement(consts.ManagedByLabelKey, selection.Equals, []string{consts.ManagedByShadowPodValue}) - utilruntime.Must(err) + if err != nil { + klog.Errorf("error creating label requirement: %s", err) + os.Exit(1) + } cacheOptions := &cache.Options{ Scheme: scheme, @@ -82,21 +91,32 @@ func main() { }, } if err != nil { - klog.Fatalf("error creating cache: %s", err) + klog.Errorf("error creating pod cache: %s", err) + os.Exit(1) } - cl, err := clientutils.GetCachedClientWithConfig(ctx, scheme, config, cacheOptions) + cl, err := clientutils.GetCachedClientWithConfig(ctx, scheme, liqoMapper, config, cacheOptions) if err != nil { - klog.Fatal(err) + klog.Errorf("error creating client: %s", err) + os.Exit(1) } router, err := remotemetrics.GetHTTPHandler(kcl.RESTClient(), cl) if err != nil { - klog.Fatal(err) + klog.Errorf("error creating http handler: %s", err) + os.Exit(1) } - err = http.ListenAndServeTLS(fmt.Sprintf(":%d", *port), *certPath, *keyPath, router) - if err != nil { - klog.Fatal("ListenAndServe: ", err) + server := http.Server{ + Addr: fmt.Sprintf(":%d", *port), + Handler: router, + ReadTimeout: *readTimeout, + WriteTimeout: *writeTimeout, + } + + klog.Infof("starting server on port %d", *port) + if err := server.ListenAndServeTLS(*certPath, *keyPath); err != nil { + klog.Errorf("error starting server: %s", err) + os.Exit(1) } } diff --git a/deployments/liqo/README.md b/deployments/liqo/README.md index cacab56554..6b5fe74ef8 100644 --- a/deployments/liqo/README.md +++ b/deployments/liqo/README.md @@ -94,6 +94,7 @@ | gateway.service.nodePort | object | `{"port":""}` | Options valid if service type is NodePort. | | gateway.service.nodePort.port | string | `""` | Force the port used by the NodePort service. | | gateway.service.type | string | `"LoadBalancer"` | Kubernetes service to be used to expose the network gateway pod. If you plan to use liqo over the Internet, consider to change this field to "LoadBalancer". Instead, if your nodes are directly reachable from the cluster you are peering to, you may change it to "NodePort". | +| metricAgent.config.timeout | object | `{"read":"30s","write":"30s"}` | Set the timeout for the metrics server. | | metricAgent.enable | bool | `true` | Enable/Disable the virtual kubelet metric agent. This component aggregates all the kubelet-related metrics (e.g., CPU, RAM, etc) collected on the nodes that are used by a remote cluster peered with you, then exporting the resulting values as a property of the virtual kubelet running on the remote cluster. | | metricAgent.imageName | string | `"ghcr.io/liqotech/metric-agent"` | Image repository for the metricAgent pod. | | metricAgent.initContainer.imageName | string | `"ghcr.io/liqotech/cert-creator"` | Image repository for the authentication init container for the metricAgent pod. | diff --git a/deployments/liqo/templates/liqo-metric-agent-deployment.yaml b/deployments/liqo/templates/liqo-metric-agent-deployment.yaml index 2606e286f2..4535edb362 100644 --- a/deployments/liqo/templates/liqo-metric-agent-deployment.yaml +++ b/deployments/liqo/templates/liqo-metric-agent-deployment.yaml @@ -65,6 +65,8 @@ spec: args: - --key-path=/certs/key.pem - --cert-path=/certs/cert.pem + - --read-timeout={{ .Values.metricAgent.config.timeout.read }} + - --write-timeout={{ .Values.metricAgent.config.timeout.write }} resources: {{- toYaml .Values.metricAgent.pod.resources | nindent 12 }} volumeMounts: - mountPath: '/certs' diff --git a/deployments/liqo/values.yaml b/deployments/liqo/values.yaml index 13f5e52e87..6edf9fdcc6 100644 --- a/deployments/liqo/values.yaml +++ b/deployments/liqo/values.yaml @@ -379,6 +379,11 @@ metricAgent: # (e.g., CPU, RAM, etc) collected on the nodes that are used by a remote cluster peered with you, then exporting # the resulting values as a property of the virtual kubelet running on the remote cluster. enable: true + config: + # -- Set the timeout for the metrics server. + timeout: + read: 30s + write: 30s pod: # -- Annotations for the metricAgent pod. annotations: {} diff --git a/pkg/utils/clients/get_cached_client.go b/pkg/utils/clients/get_cached_client.go index 00d72950be..509de86801 100644 --- a/pkg/utils/clients/get_cached_client.go +++ b/pkg/utils/clients/get_cached_client.go @@ -18,60 +18,56 @@ package clients import ( "context" "fmt" + "os" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" + ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/cluster" - - "github.com/liqotech/liqo/pkg/utils/mapper" ) -// GetCachedClient returns a controller runtime client with the cache initialized only for the resources added to -// the scheme. The necessary rest.Config is generated inside this function. -func GetCachedClient(ctx context.Context, scheme *runtime.Scheme) (client.Client, error) { - conf := ctrl.GetConfigOrDie() - if conf == nil { - err := fmt.Errorf("unable to get the config file") - klog.Error(err) - return nil, err - } - - return GetCachedClientWithConfig(ctx, scheme, conf, nil) -} - // GetCachedClientWithConfig returns a controller runtime client with the cache initialized only for the resources added to // the scheme. The necessary rest.Config is passed as third parameter, it must not be nil. func GetCachedClientWithConfig(ctx context.Context, - scheme *runtime.Scheme, conf *rest.Config, cacheOptions *cache.Options) (client.Client, error) { - if conf == nil { - err := fmt.Errorf("the rest.Config parameter is nil") - klog.Error(err) - return nil, err - } - - liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(conf, nil) + scheme *runtime.Scheme, mapper meta.RESTMapper, conf *rest.Config, cacheOptions *ctrlcache.Options) (client.Client, error) { + cache, err := ctrlcache.New(conf, *cacheOptions) if err != nil { - klog.Errorf("mapper: %s", err) - return nil, err + return nil, fmt.Errorf("unable to create the pod cache: %w", err) } - c, err := cluster.New(conf, func(o *cluster.Options) { - o.Client = client.Options{Scheme: scheme, Mapper: liqoMapper} - if cacheOptions != nil { - o.Cache = *cacheOptions - } else { - o.Cache = cache.Options{Scheme: scheme, Mapper: liqoMapper} + go func() { + klog.Info("starting pod cache") + if err := cache.Start(ctx); err != nil { + klog.Errorf("error starting pod cache: %s", err) + os.Exit(1) } + }() + + klog.Info("waiting for pod cache sync") + + if ok := cache.WaitForCacheSync(ctx); !ok { + return nil, fmt.Errorf("unable to sync pod cache") + } + + klog.Info("pod cache synced") + + if conf == nil { + return nil, fmt.Errorf("the rest.Config parameter is nil") + } + + cl, err := client.New(conf, client.Options{ + Scheme: scheme, + Mapper: mapper, + Cache: &client.CacheOptions{ + Reader: cache, + }, }) + if err != nil { - klog.Errorf("unable to create the client: %s", err) - return nil, err + return nil, fmt.Errorf("unable to create the client: %w", err) } - newClient := c.GetClient() - return newClient, nil + return cl, nil }