Skip to content

Commit

Permalink
Metric Agent: cache fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 committed Oct 13, 2023
1 parent 9f72be9 commit 415bf79
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 32 deletions.
40 changes: 30 additions & 10 deletions cmd/metric-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package main is the entrypoint of the metric-agent.
package main

import (
"context"
"flag"
"fmt"
"net/http"
"os"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -49,6 +49,8 @@ func main() {

keyPath := flag.String("key-path", "server.key", "Path to the key file")
certPath := flag.String("cert-path", "server.crt", "Path to the certificate file")
readTimeout := flag.Duration("read-timeout", 0, "Read timeout")
writeTimeout := flag.Duration("write-timeout", 0, "Write timeout")
port := flag.Int("port", 8443, "Port to listen on")

klog.InitFlags(nil)
Expand All @@ -61,16 +63,23 @@ func main() {
kcl := kubernetes.NewForConfigOrDie(config)

scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
if err := corev1.AddToScheme(scheme); err != nil {
klog.Errorf("error adding client-go scheme: %s", err)
os.Exit(1)
}

liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(config, nil)
if err != nil {
klog.Fatalf("mapper: %s", err)
klog.Errorf("mapper: %s", err)
os.Exit(1)
}

podsLabelRequirement, err := labels.NewRequirement(consts.ManagedByLabelKey,
selection.Equals, []string{consts.ManagedByShadowPodValue})
utilruntime.Must(err)
if err != nil {
klog.Errorf("error creating label requirement: %s", err)
os.Exit(1)
}

cacheOptions := &cache.Options{
Scheme: scheme,
Expand All @@ -82,21 +91,32 @@ func main() {
},
}
if err != nil {
klog.Fatalf("error creating cache: %s", err)
klog.Errorf("error creating pod cache: %s", err)
os.Exit(1)
}

cl, err := clientutils.GetCachedClientWithConfig(ctx, scheme, config, cacheOptions)
if err != nil {
klog.Fatal(err)
klog.Errorf("error creating client: %s", err)
os.Exit(1)
}

router, err := remotemetrics.GetHTTPHandler(kcl.RESTClient(), cl)
if err != nil {
klog.Fatal(err)
os.Exit(1)
}

err = http.ListenAndServeTLS(fmt.Sprintf(":%d", *port), *certPath, *keyPath, router)
if err != nil {
klog.Fatal("ListenAndServe: ", err)
server := http.Server{
Addr: fmt.Sprintf(":%d", *port),
Handler: router,
ReadTimeout: *readTimeout,
WriteTimeout: *writeTimeout,
}

klog.Infof("starting server on port %d", *port)
if err := server.ListenAndServeTLS(*certPath, *keyPath); err != nil {
klog.Fatal(err)
os.Exit(1)
}
}
2 changes: 2 additions & 0 deletions deployments/liqo/templates/liqo-metric-agent-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ spec:
args:
- --key-path=/certs/key.pem
- --cert-path=/certs/cert.pem
- --read-timeout={{ .Values.metricAgent.config.timeout }}
- --write-timeout={{ .Values.metricAgent.config.timeout }}
resources: {{- toYaml .Values.metricAgent.pod.resources | nindent 12 }}
volumeMounts:
- mountPath: '/certs'
Expand Down
2 changes: 2 additions & 0 deletions deployments/liqo/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ metricAgent:
# (e.g., CPU, RAM, etc) collected on the nodes that are used by a remote cluster peered with you, then exporting
# the resulting values as a property of the virtual kubelet running on the remote cluster.
enable: true
config:
timeout: 30s
pod:
# -- Annotations for the metricAgent pod.
annotations: {}
Expand Down
47 changes: 25 additions & 22 deletions pkg/utils/clients/get_cached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ package clients
import (
"context"
"fmt"
"os"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"

"github.com/liqotech/liqo/pkg/utils/mapper"
)

// GetCachedClient returns a controller runtime client with the cache initialized only for the resources added to
Expand All @@ -47,31 +45,36 @@ func GetCachedClient(ctx context.Context, scheme *runtime.Scheme) (client.Client
// the scheme. The necessary rest.Config is passed as third parameter, it must not be nil.
func GetCachedClientWithConfig(ctx context.Context,
scheme *runtime.Scheme, conf *rest.Config, cacheOptions *cache.Options) (client.Client, error) {
if conf == nil {
err := fmt.Errorf("the rest.Config parameter is nil")
klog.Error(err)
return nil, err
podcache, err := cache.New(conf, *cacheOptions)

Check failure on line 48 in pkg/utils/clients/get_cached_client.go

View workflow job for this annotation

GitHub Actions / Lint golang files

ineffectual assignment to err (ineffassign)
go func() {
klog.Info("starting pod cache")
if err := podcache.Start(ctx); err != nil {
klog.Errorf("error starting pod cache: %s", err)
os.Exit(1)
}
}()

klog.Info("waiting for pod cache sync")

if ok := podcache.WaitForCacheSync(ctx); !ok {
return nil, fmt.Errorf("unable to sync pod cache")
}

liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(conf, nil)
if err != nil {
klog.Errorf("mapper: %s", err)
return nil, err
klog.Info("pod cache synced")

if conf == nil {
return nil, fmt.Errorf("the rest.Config parameter is nil")
}

c, err := cluster.New(conf, func(o *cluster.Options) {
o.Client = client.Options{Scheme: scheme, Mapper: liqoMapper}
if cacheOptions != nil {
o.Cache = *cacheOptions
} else {
o.Cache = cache.Options{Scheme: scheme, Mapper: liqoMapper}
}
cl, err := client.New(conf, client.Options{
Cache: &client.CacheOptions{
Reader: podcache,
},
})

if err != nil {
klog.Errorf("unable to create the client: %s", err)
return nil, err
return nil, fmt.Errorf("unable to create the client: %w", err)
}

newClient := c.GetClient()
return newClient, nil
return cl, nil
}

0 comments on commit 415bf79

Please sign in to comment.