Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#643 from carreter/server-counter-e2e
Browse files Browse the repository at this point in the history
Lease-based server counting logic for agent and lease controller for server
  • Loading branch information
k8s-ci-robot authored Aug 23, 2024
2 parents 8aac94a + 714d092 commit 707e0c9
Show file tree
Hide file tree
Showing 95 changed files with 15,627 additions and 233 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ DOCKER_CMD ?= docker
DOCKER_CLI_EXPERIMENTAL ?= enabled
PROXY_SERVER_IP ?= 127.0.0.1

KIND_IMAGE ?= kindest/node
KIND_IMAGE ?= kindest/node:v1.30.2
CONNECTION_MODE ?= grpc
## --------------------------------------
## Testing
Expand Down
16 changes: 16 additions & 0 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ type GrpcProxyAgentOptions struct {

SyncForever bool
XfrChannelSize int

// Enables updating the server count by counting the number of valid leases
// matching the selector.
CountServerLeases bool
// Path to kubeconfig (used by kubernetes client for lease listing)
KubeconfigPath string
}

func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig {
Expand Down Expand Up @@ -122,6 +128,8 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.")
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.")
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.")
flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.")
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file")
return flags
}

Expand Down Expand Up @@ -198,6 +206,12 @@ func (o *GrpcProxyAgentOptions) Validate() error {
if err := validateAgentIdentifiers(o.AgentIdentifiers); err != nil {
return fmt.Errorf("agent address is invalid: %v", err)
}
if o.KubeconfigPath != "" {
if _, err := os.Stat(o.KubeconfigPath); os.IsNotExist(err) {
return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err)
}
}

return nil
}

Expand Down Expand Up @@ -243,6 +257,8 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
WarnOnChannelLimit: false,
SyncForever: false,
XfrChannelSize: 150,
CountServerLeases: false,
KubeconfigPath: "",
}
return &o
}
Expand Down
46 changes: 44 additions & 2 deletions cmd/agent/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,24 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
coordinationv1lister "k8s.io/client-go/listers/coordination/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"

"k8s.io/utils/clock"
"sigs.k8s.io/apiserver-network-proxy/cmd/agent/app/options"
"sigs.k8s.io/apiserver-network-proxy/pkg/agent"
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
)

const ReadHeaderTimeout = 60 * time.Second
const (
ReadHeaderTimeout = 60 * time.Second
LeaseNamespace = "kube-system"
LeaseInformerResync = time.Second * 10
)

func NewAgentCommand(a *Agent, o *options.GrpcProxyAgentOptions) *cobra.Command {
cmd := &cobra.Command{
Expand Down Expand Up @@ -133,6 +143,38 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
}),
}
cc := o.ClientSetConfig(dialOptions...)

if o.CountServerLeases {
var config *rest.Config
if o.KubeconfigPath != "" {
config, err = clientcmd.BuildConfigFromFlags("", o.KubeconfigPath)
if err != nil {
return nil, fmt.Errorf("failed to load kubernetes client config: %v", err)
}
} else {
config, err = rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to load in cluster kubernetes client config: %w", err)
}
}

k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err)
}
leaseInformer := agent.NewLeaseInformerWithMetrics(k8sClient, LeaseNamespace, LeaseInformerResync)
go leaseInformer.Run(stopCh)
cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced)
leaseLister := coordinationv1lister.NewLeaseLister(leaseInformer.GetIndexer())
serverLeaseSelector, _ := labels.Parse("k8s-app=konnectivity-server")
serverLeaseCounter := agent.NewServerLeaseCounter(
clock.RealClock{},
leaseLister,
serverLeaseSelector,
)
cc.ServerLeaseCounter = serverLeaseCounter
}

cs := cc.NewAgentClientSet(drainCh, stopCh)
cs.Serve()

Expand Down
6 changes: 5 additions & 1 deletion cmd/server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ type ProxyRunOptions struct {
// see: https://pkg.go.dev/crypto/tls#Config, so in that case, this option won't have any effect.
CipherSuites []string
XfrChannelSize int

// Lease controller configuration
EnableLeaseController bool
}

func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
Expand Down Expand Up @@ -138,7 +141,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
flags.StringVar(&o.ProxyStrategies, "proxy-strategies", o.ProxyStrategies, "The list of proxy strategies used by the server to pick an agent/tunnel, available strategies are: default, destHost, defaultRoute.")
flags.StringSliceVar(&o.CipherSuites, "cipher-suites", o.CipherSuites, "The comma separated list of allowed cipher suites. Has no effect on TLS1.3. Empty means allow default list.")
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", o.XfrChannelSize, "The size of the two KNP server channels used in server for transferring data. One channel is for data coming from the Kubernetes API Server, and the other one is for data coming from the KNP agent.")

flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.")
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")

Expand Down Expand Up @@ -351,6 +354,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
ProxyStrategies: "default",
CipherSuites: make([]string, 0),
XfrChannelSize: 10,
EnableLeaseController: false,
}
return &o
}
Expand Down
26 changes: 24 additions & 2 deletions cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,23 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"

"sigs.k8s.io/apiserver-network-proxy/cmd/server/app/options"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
"sigs.k8s.io/apiserver-network-proxy/pkg/server"
"sigs.k8s.io/apiserver-network-proxy/pkg/server/leases"
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
)

var udsListenerLock sync.Mutex

const ReadHeaderTimeout = 60 * time.Second
const (
ReadHeaderTimeout = 60 * time.Second
LeaseDuration = 30 * time.Second
LeaseRenewalInterval = 15 * time.Second
LeaseGCInterval = 15 * time.Second
LeaseNamespace = "kube-system"
)

func NewProxyCommand(p *Proxy, o *options.ProxyRunOptions) *cobra.Command {
cmd := &cobra.Command{
Expand Down Expand Up @@ -149,6 +155,22 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
}
defer p.agentServer.Stop()

if o.EnableLeaseController {
leaseController := leases.NewController(
k8sClient,
o.ServerID,
int32(LeaseDuration.Seconds()),
LeaseRenewalInterval,
LeaseGCInterval,
fmt.Sprintf("konnectivity-proxy-server-%v", o.ServerID),
LeaseNamespace,
map[string]string{"k8s-app": "konnectivity-server"},
)
klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.")
leaseController.Run(ctx)
defer leaseController.Stop()
}

klog.V(1).Infoln("Starting admin server for debug connections.")
err = p.runAdminServer(o, p.server)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ These can be run automatically using `make e2e-test`.
Before any of the actual tests are run, the `TestMain()` function
in `main_test.go` performs the following set up steps:

- Spin up a new kind cluster with the node image provided by the `-kind-image` flag.
- Spin up a new kind cluster (4 control plane and 4 worker nodes) with the node image provided by the `-kind-image` flag.
- Sideload the KNP agent and server images provided with `-agent-image` and `-server-image` into the cluster.
- Deploy the necessary RBAC and service templates for both the KNP agent and server (see `renderAndApplyManifests`).

Expand All @@ -21,3 +21,9 @@ in `main_test.go` performs the following set up steps:
These tests deploy the KNP servers and agents to the previously created kind cluster.
After the deployments are up, the tests check that both the agent and server report
the correct number of connections on their metrics endpoints.

### `lease_count_test.go`

Similar to `static_count_test.go`, except using the new lease-based server counting
system rather than passing the server count to the KNP server deployment as a CLI
flag.
123 changes: 123 additions & 0 deletions e2e/lease_count_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e

import (
"fmt"
"testing"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/e2e-framework/pkg/features"
)

func renderLeaseCountDeployments(serverReplicas, agentReplicas int) (serverDeployment client.Object, agentDeployment client.Object, err error) {
serverDeploymentConfig := DeploymentConfig{
Replicas: serverReplicas,
Image: *serverImage,
Args: []CLIFlag{
{Flag: "log-file", Value: "/var/log/konnectivity-server.log"},
{Flag: "logtostderr", Value: "true"},
{Flag: "log-file-max-size", Value: "0"},
{Flag: "uds-name", Value: "/etc/kubernetes/konnectivity-server/konnectivity-server.socket"},
{Flag: "delete-existing-uds-file"},
{Flag: "cluster-cert", Value: "/etc/kubernetes/pki/apiserver.crt"},
{Flag: "cluster-key", Value: "/etc/kubernetes/pki/apiserver.key"},
{Flag: "server-port", Value: "0"},
{Flag: "kubeconfig", Value: "/etc/kubernetes/admin.conf"},
{Flag: "keepalive-time", Value: "1h"},
{Flag: "mode", Value: "grpc"},
{Flag: "agent-namespace", Value: "kube-system"},
{Flag: "agent-service-account", Value: "konnectivity-agent"},
{Flag: "authentication-audience", Value: "system:konnectivity-server"},
{Flag: "enable-lease-controller"},
{Flag: "admin-bind-address", EmptyValue: true},
{Flag: "mode", Value: *connectionMode},
},
}
serverDeployment, _, err = renderTemplate("server/deployment.yaml", serverDeploymentConfig)
if err != nil {
return nil, nil, fmt.Errorf("could not render server deployment: %w", err)
}

agentDeploymentConfig := DeploymentConfig{
Replicas: agentReplicas,
Image: *agentImage,
Args: []CLIFlag{
{Flag: "logtostderr", Value: "true"},
{Flag: "ca-cert", Value: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"},
{Flag: "proxy-server-host", Value: "konnectivity-server.kube-system.svc.cluster.local"},
{Flag: "proxy-server-port", Value: "8091"},
{Flag: "sync-interval", Value: "1s"},
{Flag: "sync-interval-cap", Value: "10s"},
{Flag: "sync-forever"},
{Flag: "probe-interval", Value: "1s"},
{Flag: "service-account-token-path", Value: "/var/run/secrets/tokens/konnectivity-agent-token"},
{Flag: "count-server-leases"},
{Flag: "agent-identifiers", Value: "ipv4=${HOST_IP}"},
{Flag: "admin-bind-address", EmptyValue: true},
},
}
agentDeployment, _, err = renderTemplate("agent/deployment.yaml", agentDeploymentConfig)
if err != nil {
return nil, nil, fmt.Errorf("could not render agent deployment: %w", err)
}

return serverDeployment, agentDeployment, nil
}

func TestSingleServer_SingleAgent_LeaseCount(t *testing.T) {
serverDeployment, agentDeployment, err := renderLeaseCountDeployments(1, 1)
if err != nil {
t.Fatalf("could not render lease count deployments: %v", err)
}

feature := features.New("konnectivity server and agent deployment with single replica for each")
feature = feature.Setup(createDeployment(agentDeployment))
feature = feature.Setup(createDeployment(serverDeployment))
feature = feature.Setup(waitForDeployment(serverDeployment))
feature = feature.Setup(waitForDeployment(agentDeployment))
feature = feature.Assess("konnectivity server has a connected client", assertServersAreConnected(1))
feature = feature.Assess("konnectivity agent is connected to a server", assertAgentsAreConnected(1))
feature = feature.Assess("agent correctly counts 1 lease", assertAgentKnownServerCount(1))
feature = feature.Teardown(deleteDeployment(agentDeployment))
feature = feature.Teardown(deleteDeployment(serverDeployment))

testenv.Test(t, feature.Feature())
}

func TestMultiServer_MultiAgent_LeaseCount(t *testing.T) {
serverDeployment, agentDeployment, err := renderLeaseCountDeployments(2, 2)
if err != nil {
t.Fatalf("could not render lease count deployments: %v", err)
}

feature := features.New("konnectivity server and agent deployment with multiple replicas")
feature = feature.Setup(createDeployment(serverDeployment))
feature = feature.Setup(createDeployment(agentDeployment))
feature = feature.Setup(waitForDeployment(serverDeployment))
feature = feature.Setup(waitForDeployment(agentDeployment))
feature = feature.Setup(scaleDeployment(serverDeployment, 4))
feature = feature.Setup(scaleDeployment(agentDeployment, 4))
feature = feature.Setup(waitForDeployment(agentDeployment))
feature = feature.Setup(waitForDeployment(serverDeployment))
feature = feature.Assess("all servers connected to all clients after scale up", assertServersAreConnected(4))
feature = feature.Assess("all agents connected to all servers after scale up", assertAgentsAreConnected(4))
feature = feature.Assess("agents correctly count 4 leases after scale up", assertAgentKnownServerCount(4))
feature = feature.Teardown(deleteDeployment(agentDeployment))
feature = feature.Teardown(deleteDeployment(serverDeployment))

testenv.Test(t, feature.Feature())
}
Loading

0 comments on commit 707e0c9

Please sign in to comment.