Skip to content

Commit

Permalink
Metric Agent: cache fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 committed Oct 12, 2023
1 parent 9f72be9 commit e4371e3
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 89 deletions.
66 changes: 54 additions & 12 deletions cmd/metric-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package main is the entrypoint of the metric-agent.
package main

import (
"context"
"flag"
"fmt"
"net/http"
"os"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -35,7 +35,6 @@ import (

"github.com/liqotech/liqo/pkg/consts"
"github.com/liqotech/liqo/pkg/remotemetrics"
clientutils "github.com/liqotech/liqo/pkg/utils/clients"
"github.com/liqotech/liqo/pkg/utils/mapper"
"github.com/liqotech/liqo/pkg/utils/restcfg"
)
Expand All @@ -49,6 +48,8 @@ func main() {

keyPath := flag.String("key-path", "server.key", "Path to the key file")
certPath := flag.String("cert-path", "server.crt", "Path to the certificate file")
readTimeout := flag.Duration("read-timeout", 0, "Read timeout")
writeTimeout := flag.Duration("write-timeout", 0, "Write timeout")
port := flag.Int("port", 8443, "Port to listen on")

klog.InitFlags(nil)
Expand All @@ -61,42 +62,83 @@ func main() {
kcl := kubernetes.NewForConfigOrDie(config)

scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
if err := corev1.AddToScheme(scheme); err != nil {
klog.Fatalf("error adding client-go scheme: %s", err)
os.Exit(1)
}

liqoMapper, err := (mapper.LiqoMapperProvider(scheme))(config, nil)
if err != nil {
klog.Fatalf("mapper: %s", err)
os.Exit(1)
}

podsLabelRequirement, err := labels.NewRequirement(consts.ManagedByLabelKey,
selection.Equals, []string{consts.ManagedByShadowPodValue})
utilruntime.Must(err)
if err != nil {
klog.Fatalf("error creating label requirement: %s", err)
os.Exit(1)
}

cacheOptions := &cache.Options{
podcache, err := cache.New(config, cache.Options{
Scheme: scheme,
Mapper: liqoMapper,
ByObject: map[client.Object]cache.ByObject{
&corev1.Pod{}: {
Label: labels.NewSelector().Add(*podsLabelRequirement),
},
},
})

go func() {
klog.Info("starting pod cache")
if err := podcache.Start(ctx); err != nil {
klog.Fatalf("error starting pod cache: %s", err)
os.Exit(1)
}
}()

klog.Info("waiting for pod cache sync")

if ok := podcache.WaitForCacheSync(ctx); !ok {
klog.Fatalf("error waiting for cache sync: %s", err)
os.Exit(1)
}

klog.Info("pod cache synced")

if err != nil {
klog.Fatalf("error creating cache: %s", err)
klog.Fatalf("error creating pod cache: %s", err)
os.Exit(1)
}

cl, err := clientutils.GetCachedClientWithConfig(ctx, scheme, config, cacheOptions)
cl, err := client.New(config, client.Options{
Cache: &client.CacheOptions{
Reader: podcache,
},
})

if err != nil {
klog.Fatal(err)
klog.Fatalf("error creating client: %s", err)
os.Exit(1)
}

router, err := remotemetrics.GetHTTPHandler(kcl.RESTClient(), cl)
if err != nil {
klog.Fatal(err)
os.Exit(1)
}

err = http.ListenAndServeTLS(fmt.Sprintf(":%d", *port), *certPath, *keyPath, router)
if err != nil {
klog.Fatal("ListenAndServe: ", err)
server := http.Server{
Addr: fmt.Sprintf(":%d", *port),
Handler: router,
ReadTimeout: *readTimeout,
WriteTimeout: *writeTimeout,
}

klog.Infof("starting server on port %d", *port)
if err := server.ListenAndServeTLS(*certPath, *keyPath); err != nil {
klog.Fatal(err)
os.Exit(1)
}
}
77 changes: 0 additions & 77 deletions pkg/utils/clients/get_cached_client.go

This file was deleted.

0 comments on commit e4371e3

Please sign in to comment.