Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric Agent: cache fix #2080

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions cmd/metric-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package main is the entrypoint of the metric-agent.
package main

import (
"context"
"flag"
"fmt"
"net/http"
"os"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -49,6 +49,8 @@ func main() {

keyPath := flag.String("key-path", "server.key", "Path to the key file")
certPath := flag.String("cert-path", "server.crt", "Path to the certificate file")
readTimeout := flag.Duration("read-timeout", 0, "Read timeout")
writeTimeout := flag.Duration("write-timeout", 0, "Write timeout")
port := flag.Int("port", 8443, "Port to listen on")

klog.InitFlags(nil)
Expand All @@ -61,16 +63,23 @@ func main() {
kcl := kubernetes.NewForConfigOrDie(config)

scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
if err := corev1.AddToScheme(scheme); err != nil {
klog.Errorf("error adding client-go scheme: %s", err)
os.Exit(1)
}

liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(config, nil)
if err != nil {
klog.Fatalf("mapper: %s", err)
klog.Errorf("mapper: %s", err)
os.Exit(1)
}

podsLabelRequirement, err := labels.NewRequirement(consts.ManagedByLabelKey,
selection.Equals, []string{consts.ManagedByShadowPodValue})
utilruntime.Must(err)
if err != nil {
klog.Errorf("error creating label requirement: %s", err)
os.Exit(1)
}

cacheOptions := &cache.Options{
Scheme: scheme,
Expand All @@ -82,21 +91,32 @@ func main() {
},
}
if err != nil {
klog.Fatalf("error creating cache: %s", err)
klog.Errorf("error creating pod cache: %s", err)
os.Exit(1)
}

cl, err := clientutils.GetCachedClientWithConfig(ctx, scheme, config, cacheOptions)
cl, err := clientutils.GetCachedClientWithConfig(ctx, scheme, liqoMapper, config, cacheOptions)
if err != nil {
klog.Fatal(err)
klog.Errorf("error creating client: %s", err)
os.Exit(1)
}

router, err := remotemetrics.GetHTTPHandler(kcl.RESTClient(), cl)
if err != nil {
klog.Fatal(err)
klog.Errorf("error creating http handler: %s", err)
os.Exit(1)
}

err = http.ListenAndServeTLS(fmt.Sprintf(":%d", *port), *certPath, *keyPath, router)
if err != nil {
klog.Fatal("ListenAndServe: ", err)
server := http.Server{
Addr: fmt.Sprintf(":%d", *port),
Handler: router,
ReadTimeout: *readTimeout,
WriteTimeout: *writeTimeout,
}

klog.Infof("starting server on port %d", *port)
if err := server.ListenAndServeTLS(*certPath, *keyPath); err != nil {
klog.Errorf("error starting server: %s", err)
os.Exit(1)
}
}
1 change: 1 addition & 0 deletions deployments/liqo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
| gateway.service.nodePort | object | `{"port":""}` | Options valid if service type is NodePort. |
| gateway.service.nodePort.port | string | `""` | Force the port used by the NodePort service. |
| gateway.service.type | string | `"LoadBalancer"` | Kubernetes service to be used to expose the network gateway pod. If you plan to use liqo over the Internet, consider to change this field to "LoadBalancer". Instead, if your nodes are directly reachable from the cluster you are peering to, you may change it to "NodePort". |
| metricAgent.config.timeout | object | `{"read":"30s","write":"30s"}` | Set the timeout for the metrics server. |
| metricAgent.enable | bool | `true` | Enable/Disable the virtual kubelet metric agent. This component aggregates all the kubelet-related metrics (e.g., CPU, RAM, etc) collected on the nodes that are used by a remote cluster peered with you, then exporting the resulting values as a property of the virtual kubelet running on the remote cluster. |
| metricAgent.imageName | string | `"ghcr.io/liqotech/metric-agent"` | Image repository for the metricAgent pod. |
| metricAgent.initContainer.imageName | string | `"ghcr.io/liqotech/cert-creator"` | Image repository for the authentication init container for the metricAgent pod. |
Expand Down
2 changes: 2 additions & 0 deletions deployments/liqo/templates/liqo-metric-agent-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ spec:
args:
- --key-path=/certs/key.pem
- --cert-path=/certs/cert.pem
- --read-timeout={{ .Values.metricAgent.config.timeout.read }}
- --write-timeout={{ .Values.metricAgent.config.timeout.write }}
resources: {{- toYaml .Values.metricAgent.pod.resources | nindent 12 }}
volumeMounts:
- mountPath: '/certs'
Expand Down
5 changes: 5 additions & 0 deletions deployments/liqo/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,11 @@ metricAgent:
# (e.g., CPU, RAM, etc) collected on the nodes that are used by a remote cluster peered with you, then exporting
# the resulting values as a property of the virtual kubelet running on the remote cluster.
enable: true
config:
# -- Set the timeout for the metrics server.
timeout:
read: 30s
write: 30s
pod:
# -- Annotations for the metricAgent pod.
annotations: {}
Expand Down
72 changes: 34 additions & 38 deletions pkg/utils/clients/get_cached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,60 +18,56 @@ package clients
import (
"context"
"fmt"
"os"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"

"github.com/liqotech/liqo/pkg/utils/mapper"
)

// GetCachedClient returns a controller runtime client with the cache initialized only for the resources added to
// the scheme. The necessary rest.Config is generated inside this function.
func GetCachedClient(ctx context.Context, scheme *runtime.Scheme) (client.Client, error) {
conf := ctrl.GetConfigOrDie()
if conf == nil {
err := fmt.Errorf("unable to get the config file")
klog.Error(err)
return nil, err
}

return GetCachedClientWithConfig(ctx, scheme, conf, nil)
}

// GetCachedClientWithConfig returns a controller runtime client with the cache initialized only for the resources added to
// the scheme. The necessary rest.Config is passed as third parameter, it must not be nil.
func GetCachedClientWithConfig(ctx context.Context,
scheme *runtime.Scheme, conf *rest.Config, cacheOptions *cache.Options) (client.Client, error) {
if conf == nil {
err := fmt.Errorf("the rest.Config parameter is nil")
klog.Error(err)
return nil, err
}

liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(conf, nil)
scheme *runtime.Scheme, mapper meta.RESTMapper, conf *rest.Config, cacheOptions *ctrlcache.Options) (client.Client, error) {
cache, err := ctrlcache.New(conf, *cacheOptions)
if err != nil {
klog.Errorf("mapper: %s", err)
return nil, err
return nil, fmt.Errorf("unable to create the pod cache: %w", err)
}

c, err := cluster.New(conf, func(o *cluster.Options) {
o.Client = client.Options{Scheme: scheme, Mapper: liqoMapper}
if cacheOptions != nil {
o.Cache = *cacheOptions
} else {
o.Cache = cache.Options{Scheme: scheme, Mapper: liqoMapper}
go func() {
klog.Info("starting pod cache")
if err := cache.Start(ctx); err != nil {
klog.Errorf("error starting pod cache: %s", err)
os.Exit(1)
}
}()

klog.Info("waiting for pod cache sync")

if ok := cache.WaitForCacheSync(ctx); !ok {
return nil, fmt.Errorf("unable to sync pod cache")
}

klog.Info("pod cache synced")

if conf == nil {
return nil, fmt.Errorf("the rest.Config parameter is nil")
}

cl, err := client.New(conf, client.Options{
Scheme: scheme,
Mapper: mapper,
Cache: &client.CacheOptions{
Reader: cache,
},
})

if err != nil {
klog.Errorf("unable to create the client: %s", err)
return nil, err
return nil, fmt.Errorf("unable to create the client: %w", err)
}

newClient := c.GetClient()
return newClient, nil
return cl, nil
}