From abab3242d2bbe6b2318a8e067ff4d0e69279ed91 Mon Sep 17 00:00:00 2001 From: Alex Masi Date: Thu, 13 Jul 2023 12:55:15 -0700 Subject: [PATCH] Add event publishing to kne_cli and controller server (#393) * Add viper to read kne config * Add config file + pubsub reporting * Switch cli flag reading to use viper * fix unit test * fmt * Controller server can report usage * Fix unit tests for progress option * update deploy and topo libs * Use new metrics pkg * add unit test * Add section of readme explaining usage metrics reporting * Refactor to address comments * fix typo * update readme --- .gitignore | 1 + README.md | 22 ++++++ cmd/deploy/deploy.go | 3 + cmd/root.go | 17 ++++- controller/server/main.go | 15 +++- controller/server/main_test.go | 10 +++ deploy/deploy.go | 73 +++++++++++++++++++- go.mod | 1 + topo/topo.go | 74 ++++++++++++++++++++ topo/topo_test.go | 121 ++++++++++++++++++++++++++------- 10 files changed, 306 insertions(+), 31 deletions(-) diff --git a/.gitignore b/.gitignore index 25364a08..f15f0f93 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ /go.work.sum **/super-linter.log kne_cli/kne_cli +controller/server/server diff --git a/README.md b/README.md index 97fa0497..d4981a24 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,28 @@ number of production topologies. See the collection of [docs](docs/README.md) for in depth guides on how use Kubernetes Network Emulation (KNE). +## Disclaimers + +### Usage Metrics Reporting + +The KNE CLI optionally collects anonymous usage metrics. **This is turned OFF +by default.** We use the metrics to gauge the health and performance of various +KNE operations (i.e. cluster deployment, topology creation) on an **opt-in** +basis. There is a global flag `--report_usage` that when provided shares +anonymous details about certain KNE CLI commands. Collected data can be seen in +the [event proto definition](proto/event.proto). **Usage metrics are NOT shared +by default.** Additionally the PubSub project and topic the events are published +to are configurable. If you want to track your own private metrics about your +KNE usage then that is supported by providing a Cloud PubSub project/topic of +your choosing. Full details about how/when usage events are published can be +found in the codebase [here](kne/metrics/metrics.go). We appreciate usage metric +reporting as it helps us develop a better KNE experience for all of our users. +Whether that be detecting an abnormally high number of cluster deployment +failures due to an upgrade to an underlying dependency introduced by a new +commit, or detecting a bug from a scenario where the failure rate for topologies +over *n* links is far greater than *n-1* links. Usage metric reporting is +helpful tool for the KNE developers. + ## Thanks This project is mainly based on the k8s-topo from github.com/networkop/k8s-topo diff --git a/cmd/deploy/deploy.go b/cmd/deploy/deploy.go index 3bc580b8..f0d1ec53 100644 --- a/cmd/deploy/deploy.go +++ b/cmd/deploy/deploy.go @@ -102,6 +102,9 @@ func deployFn(cmd *cobra.Command, args []string) error { return err } d.Progress = viper.GetBool("progress") + d.ReportUsage = viper.GetBool("report_usage") + d.ReportUsageProjectID = viper.GetString("report_usage_project_id") + d.ReportUsageTopicID = viper.GetString("report_usage_topic_id") if err := d.Deploy(cmd.Context(), viper.GetString("kubecfg")); err != nil { return err } diff --git a/cmd/root.go b/cmd/root.go index 50be18ae..8365b98c 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -41,6 +41,9 @@ environment.`, root.SetOut(os.Stdout) cfgFile := root.PersistentFlags().String("config_file", defaultCfgFile(), "Path to KNE config file") root.PersistentFlags().String("kubecfg", defaultKubeCfg(), "kubeconfig file") + root.PersistentFlags().Bool("report_usage", false, "Whether to reporting anonymous usage metrics") + root.PersistentFlags().String("report_usage_project_id", "", "Project to report anonymous usage metrics to") + root.PersistentFlags().String("report_usage_topic_id", "", "Topic to report anonymous usage metrics to") root.PersistentPreRunE = func(cmd *cobra.Command, args []string) error { if *cfgFile == "" { return nil @@ -52,6 +55,7 @@ environment.`, } } viper.BindPFlags(cmd.Flags()) + viper.SetDefault("report_usage", false) return nil } root.AddCommand(newCreateCmd()) @@ -140,8 +144,17 @@ func createFn(cmd *cobra.Command, args []string) error { if err != nil { return fmt.Errorf("%s: %w", cmd.Use, err) } - tm, err := topo.New(topopb, topo.WithKubecfg(viper.GetString("kubecfg")), topo.WithBasePath(bp), - topo.WithProgress(viper.GetBool("progress"))) + opts := []topo.Option{ + topo.WithKubecfg(viper.GetString("kubecfg")), + topo.WithBasePath(bp), + topo.WithProgress(viper.GetBool("progress")), + topo.WithUsageReporting( + viper.GetBool("report_usage"), + viper.GetString("report_usage_project_id"), + viper.GetString("report_usage_topic_id"), + ), + } + tm, err := topo.New(topopb, opts...) if err != nil { return fmt.Errorf("%s: %w", cmd.Use, err) } diff --git a/controller/server/main.go b/controller/server/main.go index 97e16610..cc591095 100644 --- a/controller/server/main.go +++ b/controller/server/main.go @@ -49,7 +49,10 @@ var ( defaultCEOSLabOperator = "" defaultLemmingOperator = "" // Flags. - port = flag.Int("port", 50051, "Controller server port") + port = flag.Int("port", 50051, "Controller server port") + reportUsage = flag.Bool("report_usage", false, "Whether to reporting anonymous usage metrics") + reportUsageProjectID = flag.String("report_usage_project_id", "", "Project to report anonymous usage metrics to") + reportUsageTopicID = flag.String("report_usage_topic_id", "", "Topic to report anonymous usage metrics to") ) func init() { @@ -267,6 +270,10 @@ func newDeployment(req *cpb.CreateClusterRequest) (*deploy.Deployment, error) { return nil, fmt.Errorf("controller type not supported: %T", t) } } + d.Progress = true + d.ReportUsage = *reportUsage + d.ReportUsageProjectID = *reportUsageProjectID + d.ReportUsageTopicID = *reportUsageTopicID return d, nil } @@ -369,7 +376,11 @@ func (s *server) CreateTopology(ctx context.Context, req *cpb.CreateTopologyRequ if err != nil { return nil, status.Errorf(codes.InvalidArgument, "kubecfg %q does not exist: %v", path, err) } - tm, err := topo.New(topoPb, topo.WithKubecfg(kcfg)) + opts := []topo.Option{ + topo.WithKubecfg(kcfg), + topo.WithUsageReporting(*reportUsage, *reportUsageProjectID, *reportUsageTopicID), + } + tm, err := topo.New(topoPb, opts...) if err != nil { return nil, status.Errorf(codes.Internal, "failed to create topology manager: %v", err) } diff --git a/controller/server/main_test.go b/controller/server/main_test.go index 94cfdf84..881db80d 100644 --- a/controller/server/main_test.go +++ b/controller/server/main_test.go @@ -114,6 +114,7 @@ func TestNewDeployment(t *testing.T) { CNI: &deploy.MeshnetSpec{ Manifest: testFile.Name(), }, + Progress: true, }, }, { desc: "request spec - with ixiatg controller", @@ -182,6 +183,7 @@ func TestNewDeployment(t *testing.T) { ConfigMap: testFile.Name(), }, }, + Progress: true, }, }, { desc: "request spec - with srlinux controller", @@ -244,6 +246,7 @@ func TestNewDeployment(t *testing.T) { Operator: testFile.Name(), }, }, + Progress: true, }, }, { desc: "request spec - with ceoslab controller", @@ -306,6 +309,7 @@ func TestNewDeployment(t *testing.T) { Operator: testFile.Name(), }, }, + Progress: true, }, }, { desc: "request spec - with lemming controller", @@ -368,6 +372,7 @@ func TestNewDeployment(t *testing.T) { Operator: testFile.Name(), }, }, + Progress: true, }, }, { desc: "request spec - with multiple controllers empty filepath", @@ -466,6 +471,7 @@ func TestNewDeployment(t *testing.T) { Operator: defTestFile.Name(), }, }, + Progress: true, }, }, { desc: "request spec - with multiple controllers", @@ -578,6 +584,7 @@ func TestNewDeployment(t *testing.T) { Operator: testFile.Name(), }, }, + Progress: true, }, }, { desc: "request spec - with multiple controllers data", @@ -690,6 +697,7 @@ func TestNewDeployment(t *testing.T) { OperatorData: testData, }, }, + Progress: true, }, }, { desc: "request spec - without ixiatg config map", @@ -753,6 +761,7 @@ func TestNewDeployment(t *testing.T) { ConfigMap: defTestFile.Name(), }, }, + Progress: true, }, }, { desc: "request spec - default ixiatg config map dne", @@ -842,6 +851,7 @@ func TestNewDeployment(t *testing.T) { Operator: defTestFile.Name(), }, }, + Progress: true, }, }, { desc: "request spec - default manifest paths dne", diff --git a/deploy/deploy.go b/deploy/deploy.go index 9a2d7323..8dc8a9eb 100644 --- a/deploy/deploy.go +++ b/deploy/deploy.go @@ -22,7 +22,9 @@ import ( kexec "github.com/openconfig/kne/exec" "github.com/openconfig/kne/load" logshim "github.com/openconfig/kne/logshim" + "github.com/openconfig/kne/metrics" "github.com/openconfig/kne/pods" + epb "github.com/openconfig/kne/proto/event" metallbv1 "go.universe.tf/metallb/api/v1beta1" "golang.org/x/oauth2/google" appsv1 "k8s.io/api/apps/v1" @@ -123,6 +125,20 @@ type Deployment struct { // If Progress is true then deployment status updates will be sent to // standard output. Progress bool + + // If ReportUsage is true then anonymous usage metrics will be + // published using Cloud PubSub. + ReportUsage bool + // ReportUsageProjectID is the ID of the GCP project the usage + // metrics should be written to. This field is not used if + // ReportUsage is unset. An empty string will result in the + // default project being used. + ReportUsageProjectID string + // ReportUsageTopicID is the ID of the GCP PubSub topic the usage + // metrics should be written to. This field is not used if + // ReportUsage is unset. An empty string will result in the + // default topic being used. + ReportUsageTopicID string } func (d *Deployment) String() string { @@ -146,7 +162,62 @@ type kubeVersion struct { ServerVersion *kversion.Info `json:"serverVersion,omitempty" yaml:"serverVersion,omitempty"` } +// event turns the deployment into a cluster event protobuf. +func (d *Deployment) event() *epb.Cluster { + c := &epb.Cluster{} + switch d.Cluster.(type) { + case *ExternalSpec: + c.Cluster = epb.Cluster_CLUSTER_TYPE_EXTERNAL + case *KindSpec: + c.Cluster = epb.Cluster_CLUSTER_TYPE_KIND + } + switch d.Ingress.(type) { + case *MetalLBSpec: + c.Ingress = epb.Cluster_INGRESS_TYPE_METALLB + } + switch d.CNI.(type) { + case *MeshnetSpec: + c.Cni = epb.Cluster_CNI_TYPE_MESHNET + } + for _, cntrl := range d.Controllers { + switch cntrl.(type) { + case *CEOSLabSpec: + c.Controllers = append(c.Controllers, epb.Cluster_CONTROLLER_TYPE_CEOSLAB) + case *IxiaTGSpec: + c.Controllers = append(c.Controllers, epb.Cluster_CONTROLLER_TYPE_IXIATG) + case *SRLinuxSpec: + c.Controllers = append(c.Controllers, epb.Cluster_CONTROLLER_TYPE_SRLINUX) + case *LemmingSpec: + c.Controllers = append(c.Controllers, epb.Cluster_CONTROLLER_TYPE_LEMMING) + } + } + return c +} + +func (d *Deployment) reportDeployEvent(ctx context.Context) func(error) { + r, err := metrics.NewReporter(ctx, d.ReportUsageProjectID, d.ReportUsageTopicID) + if err != nil { + log.Warningf("Unable to create metrics reporter: %v", err) + return func(_ error) {} + } + id, err := r.ReportDeployClusterStart(ctx, d.event()) + if err != nil { + log.Warningf("Unable to report cluster deployment start event: %v", err) + return func(_ error) { r.Close() } + } + return func(rerr error) { + defer r.Close() + if err := r.ReportDeployClusterEnd(ctx, id, rerr); err != nil { + log.Warningf("Unable to report cluster deployment end event: %v", err) + } + } +} + func (d *Deployment) Deploy(ctx context.Context, kubecfg string) (rerr error) { + if d.ReportUsage { + finish := d.reportDeployEvent(ctx) + defer func() { finish(rerr) }() + } if err := d.checkDependencies(); err != nil { return err } @@ -195,7 +266,7 @@ func (d *Deployment) Deploy(ctx context.Context, kubecfg string) (rerr error) { if kClientVersion.Less(kServerVersion) { log.Warning("Kube client and server versions are not within expected range.") } - log.V(1).Info("Found k8s versions:\n", output) + log.V(1).Info("Found k8s versions:\n", string(output)) ctx, cancel := context.WithCancel(ctx) diff --git a/go.mod b/go.mod index bc7bfe71..6921f1ee 100644 --- a/go.mod +++ b/go.mod @@ -123,6 +123,7 @@ require ( golang.org/x/text v0.9.0 // indirect golang.org/x/time v0.3.0 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect + google.golang.org/api v0.114.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/topo/topo.go b/topo/topo.go index 66a1c16f..2e0fe3b7 100644 --- a/topo/topo.go +++ b/topo/topo.go @@ -27,8 +27,10 @@ import ( topologyclientv1 "github.com/networkop/meshnet-cni/api/clientset/v1beta1" topologyv1 "github.com/networkop/meshnet-cni/api/types/v1beta1" "github.com/openconfig/kne/events" + "github.com/openconfig/kne/metrics" "github.com/openconfig/kne/pods" cpb "github.com/openconfig/kne/proto/controller" + epb "github.com/openconfig/kne/proto/event" tpb "github.com/openconfig/kne/proto/topo" "github.com/openconfig/kne/topo/node" "google.golang.org/grpc/codes" @@ -67,6 +69,19 @@ type Manager struct { tClient topologyclientv1.Interface rCfg *rest.Config basePath string + + // If reportUsage is set, report anonymous usage metrics. + reportUsage bool + // reportUsageProjectID is the ID of the GCP project the usage + // metrics should be written to. This field is not used if + // ReportUsage is unset. An empty string will result in the + // default project being used. + reportUsageProjectID string + // reportUsageTopicID is the ID of the GCP PubSub topic the usage + // metrics should be written to. This field is not used if + // ReportUsage is unset. An empty string will result in the + // default topic being used. + reportUsageTopicID string } type Option func(m *Manager) @@ -101,6 +116,15 @@ func WithBasePath(s string) Option { } } +// WithUsageReporting writes anonymous usage metrics. +func WithUsageReporting(b bool, project, topic string) Option { + return func(m *Manager) { + m.reportUsage = b + m.reportUsageProjectID = project + m.reportUsageTopicID = topic + } +} + // WithProgress returns a Manager Option where true causes pod progress to be displayed. func WithProgress(b bool) Option { return func(m *Manager) { @@ -156,9 +180,59 @@ func New(topo *tpb.Topology, opts ...Option) (*Manager, error) { return m, nil } +// event creates a topology event protobuf from the topo. +func (m *Manager) event() *epb.Topology { + t := &epb.Topology{ + LinkCount: int64(len(m.topo.Links)), + } + for _, node := range m.topo.Nodes { + t.Nodes = append(t.Nodes, &epb.Node{ + Vendor: node.Vendor, + Model: node.Model, + }) + } + return t +} + +var ( + // Stubs for testing. + newMetricsReporter = func(ctx context.Context, project, topic string) (metricsReporter, error) { + return metrics.NewReporter(ctx, project, topic) + } +) + +type metricsReporter interface { + ReportCreateTopologyStart(context.Context, *epb.Topology) (string, error) + ReportCreateTopologyEnd(context.Context, string, error) error + Close() error +} + +func (m *Manager) reportCreateEvent(ctx context.Context) func(error) { + r, err := newMetricsReporter(ctx, m.reportUsageProjectID, m.reportUsageTopicID) + if err != nil { + log.Warningf("Unable to create metrics reporter: %v", err) + return func(_ error) {} + } + id, err := r.ReportCreateTopologyStart(ctx, m.event()) + if err != nil { + log.Warningf("Unable to report create topology start event: %v", err) + return func(_ error) { r.Close() } + } + return func(rerr error) { + defer r.Close() + if err := r.ReportCreateTopologyEnd(ctx, id, rerr); err != nil { + log.Warningf("Unable to report create topology end event: %v", err) + } + } +} + // Create creates the topology in the cluster. func (m *Manager) Create(ctx context.Context, timeout time.Duration) (rerr error) { log.V(1).Infof("Topology:\n%v", prototext.Format(m.topo)) + if m.reportUsage { + finish := m.reportCreateEvent(ctx) + defer func() { finish(rerr) }() + } if err := m.push(ctx); err != nil { return err } diff --git a/topo/topo_test.go b/topo/topo_test.go index 12ca526f..16f1c4cc 100644 --- a/topo/topo_test.go +++ b/topo/topo_test.go @@ -17,6 +17,7 @@ package topo import ( "bytes" "context" + "errors" "fmt" "io" "testing" @@ -28,6 +29,7 @@ import ( tfake "github.com/networkop/meshnet-cni/api/clientset/v1beta1/fake" topologyv1 "github.com/networkop/meshnet-cni/api/types/v1beta1" cpb "github.com/openconfig/kne/proto/controller" + epb "github.com/openconfig/kne/proto/event" tpb "github.com/openconfig/kne/proto/topo" "github.com/openconfig/kne/topo/node" "google.golang.org/protobuf/proto" @@ -387,40 +389,39 @@ func TestNew(t *testing.T) { } } +type fakeMetricsReporter struct { + reportStartErr, reportEndErr error +} + +func (f *fakeMetricsReporter) Close() error { + return nil +} + +func (f *fakeMetricsReporter) ReportCreateTopologyStart(_ context.Context, _ *epb.Topology) (string, error) { + return "fake-uuid", f.reportStartErr +} + +func (f *fakeMetricsReporter) ReportCreateTopologyEnd(_ context.Context, _ string, _ error) error { + return f.reportEndErr +} + func TestCreate(t *testing.T) { ctx := context.Background() - tf, err := tfake.NewSimpleClientset() - if err != nil { - t.Fatalf("cannot create fake topology clientset: %v", err) - } - kf := kfake.NewSimpleClientset() - kf.PrependReactor("get", "pods", func(action ktest.Action) (bool, runtime.Object, error) { - gAction, ok := action.(ktest.GetAction) - if !ok { - return false, nil, nil - } - p := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: gAction.GetName()}} - switch p.Name { - default: - p.Status.Phase = corev1.PodRunning - p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}} - case "bad": - p.Status.Phase = corev1.PodFailed - case "hanging": - p.Status.Phase = corev1.PodPending - } - return true, p, nil - }) - opts := []Option{ - WithClusterConfig(&rest.Config{}), - WithKubeClient(kf), - WithTopoClient(tf), + + origNewMetricsReporter := newMetricsReporter + defer func() { + newMetricsReporter = origNewMetricsReporter + }() + newMetricsReporter = func(_ context.Context, _, _ string) (metricsReporter, error) { + return &fakeMetricsReporter{reportStartErr: errors.New("start err"), reportEndErr: errors.New("end err")}, nil } + node.Vendor(tpb.Vendor(1002), NewConfigurable) tests := []struct { desc string topo *tpb.Topology timeout time.Duration + opts []Option wantErr string }{{ desc: "success", @@ -502,9 +503,77 @@ func TestCreate(t *testing.T) { }, }, wantErr: `Node "bad": Status FAILED`, + }, { + desc: "failed to report metrics, create still passes", + opts: []Option{WithUsageReporting(true, "", "")}, + topo: &tpb.Topology{ + Name: "test", + Nodes: []*tpb.Node{ + { + Name: "r1", + Vendor: tpb.Vendor(1002), + Services: map[uint32]*tpb.Service{ + 1000: { + Name: "ssh", + }, + }, + Config: &tpb.Config{}, + }, + { + Name: "r2", + Vendor: tpb.Vendor(1002), + Services: map[uint32]*tpb.Service{ + 2000: { + Name: "grpc", + }, + 3000: { + Name: "gnmi", + }, + }, + Config: &tpb.Config{}, + }, + }, + Links: []*tpb.Link{ + { + ANode: "r1", + AInt: "eth1", + ZNode: "r2", + ZInt: "eth1", + }, + }, + }, }} for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { + tf, err := tfake.NewSimpleClientset() + if err != nil { + t.Fatalf("cannot create fake topology clientset: %v", err) + } + kf := kfake.NewSimpleClientset() + kf.PrependReactor("get", "pods", func(action ktest.Action) (bool, runtime.Object, error) { + gAction, ok := action.(ktest.GetAction) + if !ok { + return false, nil, nil + } + p := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: gAction.GetName()}} + switch p.Name { + default: + p.Status.Phase = corev1.PodRunning + p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}} + case "bad": + p.Status.Phase = corev1.PodFailed + case "hanging": + p.Status.Phase = corev1.PodPending + } + return true, p, nil + }) + + opts := []Option{ + WithClusterConfig(&rest.Config{}), + WithKubeClient(kf), + WithTopoClient(tf), + } + opts = append(opts, tt.opts...) m, err := New(tt.topo, opts...) if err != nil { t.Fatalf("New() failed to create new topology manager: %v", err)