From 75c349f7c5acb539c9012ef3125b98dc025892a2 Mon Sep 17 00:00:00 2001 From: Francesco Cheinasso Date: Thu, 12 Oct 2023 19:29:18 +0200 Subject: [PATCH] Metric Agent: cache fix --- cmd/metric-agent/main.go | 44 ++++++++---- deployments/liqo/README.md | 1 + .../liqo-metric-agent-deployment.yaml | 2 + deployments/liqo/values.yaml | 4 ++ pkg/utils/clients/get_cached_client.go | 72 +++++++++---------- 5 files changed, 73 insertions(+), 50 deletions(-) diff --git a/cmd/metric-agent/main.go b/cmd/metric-agent/main.go index 43f65b99eb..27469fda34 100644 --- a/cmd/metric-agent/main.go +++ b/cmd/metric-agent/main.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package main is the entrypoint of the metric-agent. package main import ( @@ -19,14 +20,13 @@ import ( "flag" "fmt" "net/http" + "os" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -49,6 +49,8 @@ func main() { keyPath := flag.String("key-path", "server.key", "Path to the key file") certPath := flag.String("cert-path", "server.crt", "Path to the certificate file") + readTimeout := flag.Duration("read-timeout", 0, "Read timeout") + writeTimeout := flag.Duration("write-timeout", 0, "Write timeout") port := flag.Int("port", 8443, "Port to listen on") klog.InitFlags(nil) @@ -61,16 +63,23 @@ func main() { kcl := kubernetes.NewForConfigOrDie(config) scheme := runtime.NewScheme() - _ = clientgoscheme.AddToScheme(scheme) + if err := corev1.AddToScheme(scheme); err != nil { + klog.Errorf("error adding client-go scheme: %s", err) + os.Exit(1) + } liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(config, nil) if err != nil { - klog.Fatalf("mapper: %s", err) + klog.Errorf("mapper: %s", err) + os.Exit(1) } podsLabelRequirement, err := labels.NewRequirement(consts.ManagedByLabelKey, selection.Equals, []string{consts.ManagedByShadowPodValue}) - utilruntime.Must(err) + if err != nil { + klog.Errorf("error creating label requirement: %s", err) + os.Exit(1) + } cacheOptions := &cache.Options{ Scheme: scheme, @@ -82,21 +91,32 @@ func main() { }, } if err != nil { - klog.Fatalf("error creating cache: %s", err) + klog.Errorf("error creating pod cache: %s", err) + os.Exit(1) } - cl, err := clientutils.GetCachedClientWithConfig(ctx, scheme, config, cacheOptions) + cl, err := clientutils.GetCachedClientWithConfig(ctx, scheme, liqoMapper, config, cacheOptions) if err != nil { - klog.Fatal(err) + klog.Errorf("error creating client: %s", err) + os.Exit(1) } router, err := remotemetrics.GetHTTPHandler(kcl.RESTClient(), cl) if err != nil { - klog.Fatal(err) + klog.Errorf("error creating http handler: %s", err) + os.Exit(1) } - err = http.ListenAndServeTLS(fmt.Sprintf(":%d", *port), *certPath, *keyPath, router) - if err != nil { - klog.Fatal("ListenAndServe: ", err) + server := http.Server{ + Addr: fmt.Sprintf(":%d", *port), + Handler: router, + ReadTimeout: *readTimeout, + WriteTimeout: *writeTimeout, + } + + klog.Infof("starting server on port %d", *port) + if err := server.ListenAndServeTLS(*certPath, *keyPath); err != nil { + klog.Errorf("error starting server: %s", err) + os.Exit(1) } } diff --git a/deployments/liqo/README.md b/deployments/liqo/README.md index 32d0215070..59963bf482 100644 --- a/deployments/liqo/README.md +++ b/deployments/liqo/README.md @@ -94,6 +94,7 @@ | gateway.service.nodePort | object | `{"port":""}` | Options valid if service type is NodePort. | | gateway.service.nodePort.port | string | `""` | Force the port used by the NodePort service. | | gateway.service.type | string | `"LoadBalancer"` | Kubernetes service to be used to expose the network gateway pod. If you plan to use liqo over the Internet, consider to change this field to "LoadBalancer". Instead, if your nodes are directly reachable from the cluster you are peering to, you may change it to "NodePort". | +| metricAgent.config.timeout | string | `"30s"` | | | metricAgent.enable | bool | `true` | Enable/Disable the virtual kubelet metric agent. This component aggregates all the kubelet-related metrics (e.g., CPU, RAM, etc) collected on the nodes that are used by a remote cluster peered with you, then exporting the resulting values as a property of the virtual kubelet running on the remote cluster. | | metricAgent.imageName | string | `"ghcr.io/liqotech/metric-agent"` | Image repository for the metricAgent pod. | | metricAgent.initContainer.imageName | string | `"ghcr.io/liqotech/cert-creator"` | Image repository for the authentication init container for the metricAgent pod. | diff --git a/deployments/liqo/templates/liqo-metric-agent-deployment.yaml b/deployments/liqo/templates/liqo-metric-agent-deployment.yaml index 2606e286f2..4535edb362 100644 --- a/deployments/liqo/templates/liqo-metric-agent-deployment.yaml +++ b/deployments/liqo/templates/liqo-metric-agent-deployment.yaml @@ -65,6 +65,8 @@ spec: args: - --key-path=/certs/key.pem - --cert-path=/certs/cert.pem + - --read-timeout={{ .Values.metricAgent.config.timeout.read }} + - --write-timeout={{ .Values.metricAgent.config.timeout.write }} resources: {{- toYaml .Values.metricAgent.pod.resources | nindent 12 }} volumeMounts: - mountPath: '/certs' diff --git a/deployments/liqo/values.yaml b/deployments/liqo/values.yaml index 908d90390a..73fed2a591 100644 --- a/deployments/liqo/values.yaml +++ b/deployments/liqo/values.yaml @@ -377,6 +377,10 @@ metricAgent: # (e.g., CPU, RAM, etc) collected on the nodes that are used by a remote cluster peered with you, then exporting # the resulting values as a property of the virtual kubelet running on the remote cluster. enable: true + config: + timeout: + read: 30s + write: 30s pod: # -- Annotations for the metricAgent pod. annotations: {} diff --git a/pkg/utils/clients/get_cached_client.go b/pkg/utils/clients/get_cached_client.go index 00d72950be..509de86801 100644 --- a/pkg/utils/clients/get_cached_client.go +++ b/pkg/utils/clients/get_cached_client.go @@ -18,60 +18,56 @@ package clients import ( "context" "fmt" + "os" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" + ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/cluster" - - "github.com/liqotech/liqo/pkg/utils/mapper" ) -// GetCachedClient returns a controller runtime client with the cache initialized only for the resources added to -// the scheme. The necessary rest.Config is generated inside this function. -func GetCachedClient(ctx context.Context, scheme *runtime.Scheme) (client.Client, error) { - conf := ctrl.GetConfigOrDie() - if conf == nil { - err := fmt.Errorf("unable to get the config file") - klog.Error(err) - return nil, err - } - - return GetCachedClientWithConfig(ctx, scheme, conf, nil) -} - // GetCachedClientWithConfig returns a controller runtime client with the cache initialized only for the resources added to // the scheme. The necessary rest.Config is passed as third parameter, it must not be nil. func GetCachedClientWithConfig(ctx context.Context, - scheme *runtime.Scheme, conf *rest.Config, cacheOptions *cache.Options) (client.Client, error) { - if conf == nil { - err := fmt.Errorf("the rest.Config parameter is nil") - klog.Error(err) - return nil, err - } - - liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(conf, nil) + scheme *runtime.Scheme, mapper meta.RESTMapper, conf *rest.Config, cacheOptions *ctrlcache.Options) (client.Client, error) { + cache, err := ctrlcache.New(conf, *cacheOptions) if err != nil { - klog.Errorf("mapper: %s", err) - return nil, err + return nil, fmt.Errorf("unable to create the pod cache: %w", err) } - c, err := cluster.New(conf, func(o *cluster.Options) { - o.Client = client.Options{Scheme: scheme, Mapper: liqoMapper} - if cacheOptions != nil { - o.Cache = *cacheOptions - } else { - o.Cache = cache.Options{Scheme: scheme, Mapper: liqoMapper} + go func() { + klog.Info("starting pod cache") + if err := cache.Start(ctx); err != nil { + klog.Errorf("error starting pod cache: %s", err) + os.Exit(1) } + }() + + klog.Info("waiting for pod cache sync") + + if ok := cache.WaitForCacheSync(ctx); !ok { + return nil, fmt.Errorf("unable to sync pod cache") + } + + klog.Info("pod cache synced") + + if conf == nil { + return nil, fmt.Errorf("the rest.Config parameter is nil") + } + + cl, err := client.New(conf, client.Options{ + Scheme: scheme, + Mapper: mapper, + Cache: &client.CacheOptions{ + Reader: cache, + }, }) + if err != nil { - klog.Errorf("unable to create the client: %s", err) - return nil, err + return nil, fmt.Errorf("unable to create the client: %w", err) } - newClient := c.GetClient() - return newClient, nil + return cl, nil }