Skip to content

Commit

Permalink
Metric Agent: cache fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 committed Oct 16, 2023
1 parent 9f72be9 commit e9072db
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 50 deletions.
44 changes: 32 additions & 12 deletions cmd/metric-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package main is the entrypoint of the metric-agent.
package main

import (
"context"
"flag"
"fmt"
"net/http"
"os"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -49,6 +49,8 @@ func main() {

keyPath := flag.String("key-path", "server.key", "Path to the key file")
certPath := flag.String("cert-path", "server.crt", "Path to the certificate file")
readTimeout := flag.Duration("read-timeout", 0, "Read timeout")
writeTimeout := flag.Duration("write-timeout", 0, "Write timeout")
port := flag.Int("port", 8443, "Port to listen on")

klog.InitFlags(nil)
Expand All @@ -61,16 +63,23 @@ func main() {
kcl := kubernetes.NewForConfigOrDie(config)

scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
if err := corev1.AddToScheme(scheme); err != nil {
klog.Errorf("error adding client-go scheme: %s", err)
os.Exit(1)
}

liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(config, nil)
if err != nil {
klog.Fatalf("mapper: %s", err)
klog.Errorf("mapper: %s", err)
os.Exit(1)
}

podsLabelRequirement, err := labels.NewRequirement(consts.ManagedByLabelKey,
selection.Equals, []string{consts.ManagedByShadowPodValue})
utilruntime.Must(err)
if err != nil {
klog.Errorf("error creating label requirement: %s", err)
os.Exit(1)
}

cacheOptions := &cache.Options{
Scheme: scheme,
Expand All @@ -82,21 +91,32 @@ func main() {
},
}
if err != nil {
klog.Fatalf("error creating cache: %s", err)
klog.Errorf("error creating pod cache: %s", err)
os.Exit(1)
}

cl, err := clientutils.GetCachedClientWithConfig(ctx, scheme, config, cacheOptions)
cl, err := clientutils.GetCachedClientWithConfig(ctx, scheme, liqoMapper, config, cacheOptions)
if err != nil {
klog.Fatal(err)
klog.Errorf("error creating client: %s", err)
os.Exit(1)
}

router, err := remotemetrics.GetHTTPHandler(kcl.RESTClient(), cl)
if err != nil {
klog.Fatal(err)
klog.Errorf("error creating http handler: %s", err)
os.Exit(1)
}

err = http.ListenAndServeTLS(fmt.Sprintf(":%d", *port), *certPath, *keyPath, router)
if err != nil {
klog.Fatal("ListenAndServe: ", err)
server := http.Server{
Addr: fmt.Sprintf(":%d", *port),
Handler: router,
ReadTimeout: *readTimeout,
WriteTimeout: *writeTimeout,
}

klog.Infof("starting server on port %d", *port)
if err := server.ListenAndServeTLS(*certPath, *keyPath); err != nil {
klog.Errorf("error starting server: %s", err)
os.Exit(1)
}
}
1 change: 1 addition & 0 deletions deployments/liqo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
| gateway.service.nodePort | object | `{"port":""}` | Options valid if service type is NodePort. |
| gateway.service.nodePort.port | string | `""` | Force the port used by the NodePort service. |
| gateway.service.type | string | `"LoadBalancer"` | Kubernetes service to be used to expose the network gateway pod. If you plan to use liqo over the Internet, consider to change this field to "LoadBalancer". Instead, if your nodes are directly reachable from the cluster you are peering to, you may change it to "NodePort". |
| metricAgent.config.timeout | object | `{"read":"30s","write":"30s"}` | Set the timeout for the metrics server. |
| metricAgent.enable | bool | `true` | Enable/Disable the virtual kubelet metric agent. This component aggregates all the kubelet-related metrics (e.g., CPU, RAM, etc) collected on the nodes that are used by a remote cluster peered with you, then exporting the resulting values as a property of the virtual kubelet running on the remote cluster. |
| metricAgent.imageName | string | `"ghcr.io/liqotech/metric-agent"` | Image repository for the metricAgent pod. |
| metricAgent.initContainer.imageName | string | `"ghcr.io/liqotech/cert-creator"` | Image repository for the authentication init container for the metricAgent pod. |
Expand Down
2 changes: 2 additions & 0 deletions deployments/liqo/templates/liqo-metric-agent-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ spec:
args:
- --key-path=/certs/key.pem
- --cert-path=/certs/cert.pem
- --read-timeout={{ .Values.metricAgent.config.timeout.read }}
- --write-timeout={{ .Values.metricAgent.config.timeout.write }}
resources: {{- toYaml .Values.metricAgent.pod.resources | nindent 12 }}
volumeMounts:
- mountPath: '/certs'
Expand Down
5 changes: 5 additions & 0 deletions deployments/liqo/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ metricAgent:
# (e.g., CPU, RAM, etc) collected on the nodes that are used by a remote cluster peered with you, then exporting
# the resulting values as a property of the virtual kubelet running on the remote cluster.
enable: true
config:
# -- Set the timeout for the metrics server.
timeout:
read: 30s
write: 30s
pod:
# -- Annotations for the metricAgent pod.
annotations: {}
Expand Down
72 changes: 34 additions & 38 deletions pkg/utils/clients/get_cached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,60 +18,56 @@ package clients
import (
"context"
"fmt"
"os"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"

"github.com/liqotech/liqo/pkg/utils/mapper"
)

// GetCachedClient returns a controller runtime client with the cache initialized only for the resources added to
// the scheme. The necessary rest.Config is generated inside this function.
func GetCachedClient(ctx context.Context, scheme *runtime.Scheme) (client.Client, error) {
conf := ctrl.GetConfigOrDie()
if conf == nil {
err := fmt.Errorf("unable to get the config file")
klog.Error(err)
return nil, err
}

return GetCachedClientWithConfig(ctx, scheme, conf, nil)
}

// GetCachedClientWithConfig returns a controller runtime client with the cache initialized only for the resources added to
// the scheme. The necessary rest.Config is passed as third parameter, it must not be nil.
func GetCachedClientWithConfig(ctx context.Context,
scheme *runtime.Scheme, conf *rest.Config, cacheOptions *cache.Options) (client.Client, error) {
if conf == nil {
err := fmt.Errorf("the rest.Config parameter is nil")
klog.Error(err)
return nil, err
}

liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(conf, nil)
scheme *runtime.Scheme, mapper meta.RESTMapper, conf *rest.Config, cacheOptions *ctrlcache.Options) (client.Client, error) {
cache, err := ctrlcache.New(conf, *cacheOptions)
if err != nil {
klog.Errorf("mapper: %s", err)
return nil, err
return nil, fmt.Errorf("unable to create the pod cache: %w", err)
}

c, err := cluster.New(conf, func(o *cluster.Options) {
o.Client = client.Options{Scheme: scheme, Mapper: liqoMapper}
if cacheOptions != nil {
o.Cache = *cacheOptions
} else {
o.Cache = cache.Options{Scheme: scheme, Mapper: liqoMapper}
go func() {
klog.Info("starting pod cache")
if err := cache.Start(ctx); err != nil {
klog.Errorf("error starting pod cache: %s", err)
os.Exit(1)
}
}()

klog.Info("waiting for pod cache sync")

if ok := cache.WaitForCacheSync(ctx); !ok {
return nil, fmt.Errorf("unable to sync pod cache")
}

klog.Info("pod cache synced")

if conf == nil {
return nil, fmt.Errorf("the rest.Config parameter is nil")
}

cl, err := client.New(conf, client.Options{
Scheme: scheme,
Mapper: mapper,
Cache: &client.CacheOptions{
Reader: cache,
},
})

if err != nil {
klog.Errorf("unable to create the client: %s", err)
return nil, err
return nil, fmt.Errorf("unable to create the client: %w", err)
}

newClient := c.GetClient()
return newClient, nil
return cl, nil
}

0 comments on commit e9072db

Please sign in to comment.