Skip to content

Commit

Permalink
cr-syncer: switch to log/slog (#288)
Browse files Browse the repository at this point in the history
  • Loading branch information
therjak authored Jan 15, 2024
1 parent aebb257 commit cb3a8aa
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 34 deletions.
1 change: 1 addition & 0 deletions src/go/cmd/cr-syncer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
importpath = "github.com/googlecloudrobotics/core/src/go/cmd/cr-syncer",
visibility = ["//visibility:private"],
deps = [
"@com_github_googlecloudrobotics_ilog//:go_default_library",
"@com_github_motemen_go_loghttp//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@io_k8s_apiextensions_apiserver//pkg/apis/apiextensions/v1:go_default_library",
Expand Down
5 changes: 3 additions & 2 deletions src/go/cmd/cr-syncer/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package main

import (
"context"
"log"
"log/slog"
"net/http"

"github.com/googlecloudrobotics/ilog"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -37,7 +38,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// If this becomes a problem, we could do the requests in the
// background and just check the status of the latest request here.
if _, err := h.client.Resource(gvr).List(h.ctx, metav1.ListOptions{Limit: 1}); k8serrors.IsUnauthorized(err) {
log.Printf("failed health check: %v", err)
slog.Error("failed health check", ilog.Err(err))
http.Error(w, "unhealthy", http.StatusInternalServerError)
return
}
Expand Down
47 changes: 32 additions & 15 deletions src/go/cmd/cr-syncer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@
//
// Annotation "filter-by-robot-name"
//
// cr-syncer.cloudrobotics.com/filter-by-robot-name: <bool>
// cr-syncer.cloudrobotics.com/filter-by-robot-name: <bool>
//
// If true, only sync CRs that have a label 'cloudrobotics.com/robot-name: <robot-name>'
// that matches the robot-name arg given on the command line.
//
// Annotation "status-subtree"
//
// cr-syncer.cloudrobotics.com/status-subtree: <string>
// cr-syncer.cloudrobotics.com/status-subtree: <string>
//
// If specified, only sync the given subtree of the Status field. This is useful
// if resources have a shared status.
//
// Annotation "spec-source"
//
// cr-syncer.cloudrobotics.com/spec-source: <string>
// cr-syncer.cloudrobotics.com/spec-source: <string>
//
// If set to "cloud", the source of truth for object existence and specs (upstream) is
// the remote cluster and for status it's local (downstream). If set to "robot", the roles
Expand All @@ -44,12 +44,14 @@ package main
import (
"flag"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"strings"
"time"

"contrib.go.opencensus.io/exporter/prometheus"
"github.com/googlecloudrobotics/ilog"
"github.com/motemen/go-loghttp"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -200,7 +202,7 @@ func streamCrds(done <-chan struct{}, clientset crdclientset.Interface, crds cha

go informer.Run(done)

log.Printf("Syncing cache for CRDs")
slog.Info("Syncing cache for CRDs")
ok := cache.WaitForCacheSync(done, informer.HasSynced)
if !ok {
return fmt.Errorf("WaitForCacheSync failed")
Expand All @@ -225,13 +227,22 @@ func main() {
flag.Parse()
ctx := context.Background()

ll := slog.LevelInfo
if *verbose {
ll = slog.LevelDebug
}
logHandler := ilog.NewLogHandler(ll, os.Stderr)
slog.SetDefault(slog.New(logHandler))

localConfig, err := rest.InClusterConfig()
if err != nil {
log.Fatal(err)
slog.Error("InClusterConfig", ilog.Err(err))
os.Exit(1)
}
localCtx, err := tag.New(ctx, tag.Insert(tagLocation, "local"))
if err != nil {
log.Fatal(err)
slog.Error("tag.New", ilog.Err(err))
os.Exit(1)
}
localConfig.WrapTransport = func(base http.RoundTripper) http.RoundTripper {
if *verbose {
Expand All @@ -242,20 +253,24 @@ func main() {
}
local, err := dynamic.NewForConfig(localConfig)
if err != nil {
log.Fatal(err)
slog.Error("NewForConfig", ilog.Err(err))
os.Exit(1)
}
remoteConfig, err := restConfigForRemote(ctx)
if err != nil {
log.Fatal(err)
slog.Error("restConfigForRemote", ilog.Err(err))
os.Exit(1)
}
remote, err := dynamic.NewForConfig(remoteConfig)
if err != nil {
log.Fatal(err)
slog.Error("NewForConfig", ilog.Err(err))
os.Exit(1)
}

exporter, err := prometheus.NewExporter(prometheus.Options{})
if err != nil {
log.Fatal(err)
slog.Error("NewExporter", ilog.Err(err))
os.Exit(1)
}
view.RegisterExporter(exporter)
view.SetReportingPeriod(time.Second)
Expand All @@ -265,21 +280,23 @@ func main() {

go func() {
if err := http.ListenAndServe(*listenAddr, nil); err != nil {
log.Fatalln(err)
slog.Error("ListenAndServe", ilog.Err(err))
os.Exit(1)
}
}()

crds := make(chan CrdChange)
if err := streamCrds(ctx.Done(), crdclientset.NewForConfigOrDie(localConfig), crds); err != nil {
log.Fatalf("Unable to stream CRDs from local Kubernetes: %v", err)
slog.Error("Unable to stream CRDs from local Kubernetes", ilog.Err(err))
os.Exit(1)
}
syncers := make(map[string]*crSyncer)
for crd := range crds {
name := crd.CRD.GetName()

if cur, ok := syncers[name]; ok {
if crd.Type == watch.Added {
log.Printf("Warning: Already had a running sync for freshly added %s", name)
slog.Warn("Already had a running sync", slog.String("syncer", name))
}
cur.stop()
delete(syncers, name)
Expand All @@ -292,7 +309,7 @@ func main() {
// instead.
s, err := newCRSyncer(ctx, *crd.CRD, local, remote, *robotName)
if err != nil {
log.Printf("skipping custom resource %s: %s", name, err)
slog.Info("skipping custom resource", slog.String("Resource", name), ilog.Err(err))
continue
}
syncers[name] = s
Expand Down
52 changes: 35 additions & 17 deletions src/go/cmd/cr-syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package main
import (
"context"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"strconv"
"time"

"github.com/googlecloudrobotics/ilog"
"github.com/pkg/errors"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -114,7 +116,7 @@ func removeFinalizer(ctx context.Context, client dynamic.ResourceInterface, obj
if isNotFoundError(err) {
return
}
log.Printf("failed to remove finalizers: %v", err)
slog.Error("failed to remove finalizers", ilog.Err(err))
}
}

Expand Down Expand Up @@ -169,8 +171,10 @@ func newCRSyncer(
)
if filterByRobotValue != "" {
if v, err := strconv.ParseBool(filterByRobotValue); err != nil {
log.Printf("Value for %s must be boolean on %s, got %q",
annotationFilterByRobotName, crd.ObjectMeta.Name, filterByRobotValue)
slog.Error("Value must be boolean",
slog.String("Filter", annotationFilterByRobotName),
slog.String("Target", crd.ObjectMeta.Name),
slog.String("Got", filterByRobotValue))
} else {
filterByRobot = v
}
Expand Down Expand Up @@ -222,7 +226,7 @@ func newCRSyncer(
s.labelSelector = labelRobotName + "=" + robotName
} else {
// TODO(fabxc): should this return an error instead?
log.Printf("%s requested to filter by robot-name, but no robot-name was given to cr-syncer", crd.ObjectMeta.Name)
slog.Warn("request to filter by robot-name, but no robot-name was given to cr-syncer", slog.String("Requester", crd.ObjectMeta.Name))
}
}

Expand Down Expand Up @@ -294,8 +298,12 @@ func (s *crSyncer) setupInformerHandlers(
) {
receive := func(obj interface{}, action string) {
u := obj.(*unstructured.Unstructured)
log.Printf("Got %s event from %s for %s %s@v%s",
action, direction, u.GetKind(), u.GetName(), u.GetResourceVersion())
slog.Info("Got Event",
slog.String("Event", action),
slog.String("Direction", direction),
slog.String("Kind", u.GetKind()),
slog.String("Name", u.GetName()),
slog.String("Version", u.GetResourceVersion()))
if key, ok := keyFunc(obj); ok {
queue.AddRateLimited(key)
}
Expand Down Expand Up @@ -332,10 +340,10 @@ func (s *crSyncer) processNextWorkItem(
// This could occur at watchers of single CRDs while others keep working. Thus, it is less resource intensive just restarting informers of the affected CRDs rather than whoel cr-syncer
// Errors are counted in syncUpstream and syncDownstream functions
if s.conflictErrors >= *conflictErrorLimit {
log.Printf("Restarting informers of %s because of too many conflict errors", s.crd.GetName())
slog.Info("Restarting informers because of too many conflict errors", slog.String("CRD", s.crd.GetName()))
err := s.restartInformers()
if err != nil {
log.Printf("Restarting informers for %s failed", s.crd.GetName())
slog.Warn("Restarting informers failed", slog.String("CRD", s.crd.GetName()))
q.AddRateLimited(key)
return true
} else {
Expand All @@ -355,7 +363,10 @@ func (s *crSyncer) processNextWorkItem(
}
// Synchronization failed, retry later.
stats.Record(ctx, mSyncErrors.M(1))
log.Printf("Syncing key %q from queue %q failed: %v", key, qName, err)
slog.Warn("Syncing key from queue failed",
slog.Any("Key", key),
slog.String("Queue", qName),
ilog.Err(err))
q.AddRateLimited(key)

return true
Expand All @@ -365,11 +376,11 @@ func (s *crSyncer) run() {
defer s.upstreamQueue.ShutDown()
defer s.downstreamQueue.ShutDown()

log.Printf("Starting syncer for %s", s.crd.GetName())
slog.Info("Starting syncer", slog.String("CRD", s.crd.GetName()))

// Start informers that will populate their associated workqueue.
if err := s.startInformers(); err != nil {
log.Printf("Starting informers for %s failed: %s", s.crd.GetName(), err)
slog.Warn("Starting informers failed", slog.String("CRD", s.crd.GetName()), ilog.Err(err))
return
}

Expand All @@ -394,7 +405,7 @@ func (s *crSyncer) run() {
}

func (s *crSyncer) stop() {
log.Printf("Stopping syncer for %s", s.crd.GetName())
slog.Info("Stopping syncer", slog.String("CRD", s.crd.GetName()))
close(s.done)
}

Expand Down Expand Up @@ -497,8 +508,12 @@ func (s *crSyncer) syncDownstream(key string) error {
if s.clusterName != cloudClusterName {
s.conflictErrors = 0
}
log.Printf("Copied %s %s status@v%s to upstream@v%s",
src.GetKind(), src.GetName(), src.GetResourceVersion(), dst.GetResourceVersion())
slog.Info("Copied status to upstream",
slog.String("Kind", src.GetKind()),
slog.String("Name", src.GetName()),
slog.Any("Source version", src.GetResourceVersion()),
slog.Any("Destination version", dst.GetResourceVersion()))

return nil
}

Expand Down Expand Up @@ -558,7 +573,10 @@ func (s *crSyncer) syncUpstream(key string) error {
}
return nil
default:
log.Fatalf("unhandled condition: srcExists=%t, dstExists=%t", srcExists, dstExists)
slog.Error("unhandled condition",
slog.Bool("srcExists", srcExists),
slog.Bool("dstExists", dstExists))
os.Exit(1)
return nil
}

Expand Down Expand Up @@ -621,7 +639,7 @@ func newAPIErrorf(o *unstructured.Unstructured, format string, args ...interface
func keyFunc(obj interface{}) (string, bool) {
k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
log.Printf("deriving key failed: %s", err)
slog.Warn("deriving key failed", ilog.Err(err))
return k, false
}
return k, true
Expand Down

0 comments on commit cb3a8aa

Please sign in to comment.