From ea0a0d314b4d7ae02383adb6f8b74cee5ae330d4 Mon Sep 17 00:00:00 2001 From: Imran Pochi Date: Fri, 25 Oct 2024 17:52:01 +0000 Subject: [PATCH] make lease label and lease namesapce configurable Signed-off-by: Imran Pochi --- cmd/agent/app/options/options.go | 18 ++++++++ cmd/agent/app/server.go | 5 +-- cmd/server/app/options/options.go | 18 ++++++++ cmd/server/app/server.go | 10 +++-- pkg/util/util.go | 27 +++++++++++ pkg/util/util_test.go | 74 +++++++++++++++++++++++++++++++ 6 files changed, 146 insertions(+), 6 deletions(-) create mode 100644 pkg/util/util.go create mode 100644 pkg/util/util_test.go diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index f164d15cd..3f0950963 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -85,6 +85,10 @@ type GrpcProxyAgentOptions struct { // Enables updating the server count by counting the number of valid leases // matching the selector. CountServerLeases bool + // Namespace where lease objects are managed. + LeaseNamespace string + // Labels on which lease objects are managed. + LeaseLabel string // Path to kubeconfig (used by kubernetes client for lease listing) KubeconfigPath string // Content type of requests sent to apiserver. @@ -132,6 +136,8 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet { flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.") flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.") flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.") + flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "Namespace where lease objects are managed.") + flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.") flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file") flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.") return flags @@ -159,6 +165,9 @@ func (o *GrpcProxyAgentOptions) Print() { klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers)) klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit) klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever) + klog.V(1).Infof("CountServerLeases set to %v.\n", o.CountServerLeases) + klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace) + klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel) klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize) klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType) } @@ -216,6 +225,13 @@ func (o *GrpcProxyAgentOptions) Validate() error { return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err) } } + // Validate labels provided. + if o.CountServerLeases { + _, err := util.ParseLabels(o.LeaseLabel) + if err != nil { + return err + } + } return nil } @@ -263,6 +279,8 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions { SyncForever: false, XfrChannelSize: 150, CountServerLeases: false, + LeaseNamespace: "kube-system", + LeaseLabel: "k8s-app=konnectivity-server", KubeconfigPath: "", APIContentType: runtime.ContentTypeProtobuf, } diff --git a/cmd/agent/app/server.go b/cmd/agent/app/server.go index c98dd7cae..83ee3afbb 100644 --- a/cmd/agent/app/server.go +++ b/cmd/agent/app/server.go @@ -53,7 +53,6 @@ import ( const ( ReadHeaderTimeout = 60 * time.Second - LeaseNamespace = "kube-system" LeaseInformerResync = time.Second * 10 ) @@ -163,11 +162,11 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st if err != nil { return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err) } - leaseInformer := agent.NewLeaseInformerWithMetrics(k8sClient, LeaseNamespace, LeaseInformerResync) + leaseInformer := agent.NewLeaseInformerWithMetrics(k8sClient, o.LeaseNamespace, LeaseInformerResync) go leaseInformer.Run(stopCh) cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced) leaseLister := coordinationv1lister.NewLeaseLister(leaseInformer.GetIndexer()) - serverLeaseSelector, _ := labels.Parse("k8s-app=konnectivity-server") + serverLeaseSelector, _ := labels.Parse(o.LeaseLabel) serverLeaseCounter := agent.NewServerLeaseCounter( clock.RealClock{}, leaseLister, diff --git a/cmd/server/app/options/options.go b/cmd/server/app/options/options.go index de0b89902..f5a26621c 100644 --- a/cmd/server/app/options/options.go +++ b/cmd/server/app/options/options.go @@ -108,6 +108,10 @@ type ProxyRunOptions struct { // Lease controller configuration EnableLeaseController bool + // Lease Namespace + LeaseNamespace string + // Lease Labels + LeaseLabel string } func (o *ProxyRunOptions) Flags() *pflag.FlagSet { @@ -146,6 +150,8 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet { flags.StringSliceVar(&o.CipherSuites, "cipher-suites", o.CipherSuites, "The comma separated list of allowed cipher suites. Has no effect on TLS1.3. Empty means allow default list.") flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", o.XfrChannelSize, "The size of the two KNP server channels used in server for transferring data. One channel is for data coming from the Kubernetes API Server, and the other one is for data coming from the KNP agent.") flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.") + flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.") + flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.") flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.") flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.") @@ -184,6 +190,9 @@ func (o *ProxyRunOptions) Print() { klog.V(1).Infof("KubeconfigBurst set to %d.\n", o.KubeconfigBurst) klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType) klog.V(1).Infof("ProxyStrategies set to %q.\n", o.ProxyStrategies) + klog.V(1).Infof("EnableLeaseController set to %v.\n", o.EnableLeaseController) + klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace) + klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel) klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites) klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize) } @@ -321,6 +330,13 @@ func (o *ProxyRunOptions) Validate() error { } } } + // Validate labels provided. + if o.EnableLeaseController { + _, err := util.ParseLabels(o.LeaseLabel) + if err != nil { + return err + } + } return nil } @@ -361,6 +377,8 @@ func NewProxyRunOptions() *ProxyRunOptions { CipherSuites: make([]string, 0), XfrChannelSize: 10, EnableLeaseController: false, + LeaseNamespace: "kube-system", + LeaseLabel: "k8s-app=konnectivity-server", } return &o } diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index d96cb105b..a9c265f9d 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -57,7 +57,6 @@ const ( LeaseDuration = 30 * time.Second LeaseRenewalInterval = 15 * time.Second LeaseGCInterval = 15 * time.Second - LeaseNamespace = "kube-system" ) func NewProxyCommand(p *Proxy, o *options.ProxyRunOptions) *cobra.Command { @@ -156,6 +155,11 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { } defer p.agentServer.Stop() + labels, err := util.ParseLabels(o.LeaseLabel) + if err != nil { + return err + } + if o.EnableLeaseController { leaseController := leases.NewController( k8sClient, @@ -164,8 +168,8 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { LeaseRenewalInterval, LeaseGCInterval, fmt.Sprintf("konnectivity-proxy-server-%v", o.ServerID), - LeaseNamespace, - map[string]string{"k8s-app": "konnectivity-server"}, + o.LeaseNamespace, + labels, ) klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.") leaseController.Run(ctx) diff --git a/pkg/util/util.go b/pkg/util/util.go new file mode 100644 index 000000000..77e694d5f --- /dev/null +++ b/pkg/util/util.go @@ -0,0 +1,27 @@ +package util + +import ( + "fmt" + "strings" +) + +// ParseLabels takes a comma-separated string of key-value pairs and returns a map of labels. +func ParseLabels(labelStr string) (map[string]string, error) { + labels := make(map[string]string) + + if len(labelStr) == 0 { + return labels, fmt.Errorf("empty string provided") + } + pairs := strings.Split(labelStr, ",") + + for _, pair := range pairs { + keyValue := strings.Split(pair, "=") + if len(keyValue) != 2 { + return nil, fmt.Errorf("invalid label format: %s", pair) + } + key := strings.TrimSpace(keyValue[0]) + value := strings.TrimSpace(keyValue[1]) + labels[key] = value + } + return labels, nil +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go new file mode 100644 index 000000000..82fc56d30 --- /dev/null +++ b/pkg/util/util_test.go @@ -0,0 +1,74 @@ +package util + +import ( + "testing" +) + +func TestParseLabels(t *testing.T) { + testCases := []struct { + input string + expectedOutput map[string]string + shouldError bool + }{ + { + input: "app=myapp,env=prod,version=1.0", + expectedOutput: map[string]string{ + "app": "myapp", + "env": "prod", + "version": "1.0", + }, + shouldError: false, + }, + { + input: "app=myapp,env=prod,invalid", + expectedOutput: nil, + shouldError: true, + }, + { + input: "app=myapp", + expectedOutput: map[string]string{ + "app": "myapp", + }, + shouldError: false, + }, + { + input: "", + expectedOutput: map[string]string{}, + shouldError: true, + }, + { + input: " key = value , another = test ", + expectedOutput: map[string]string{ + "key": "value", + "another": "test", + }, + shouldError: false, + }, + } + + for _, tc := range testCases { + output, err := ParseLabels(tc.input) + + // Check for unexpected errors or missing errors + if tc.shouldError && err == nil { + t.Errorf("expected error for input %q but got none", tc.input) + continue + } + if !tc.shouldError && err != nil { + t.Errorf("did not expect error for input %q but got: %v", tc.input, err) + continue + } + + // Compare maps if there was no error + if !tc.shouldError { + if len(output) != len(tc.expectedOutput) { + t.Errorf("for input %q, expected map length %d but got %d", tc.input, len(tc.expectedOutput), len(output)) + } + for key, expectedValue := range tc.expectedOutput { + if output[key] != expectedValue { + t.Errorf("for input %q, expected %q=%q but got %q=%q", tc.input, key, expectedValue, key, output[key]) + } + } + } + } +}