diff --git a/pkg/reconciler/kubeapf/kube_apf.go b/pkg/reconciler/kubeapf/kube_apf.go new file mode 100644 index 00000000000..5a8641e421e --- /dev/null +++ b/pkg/reconciler/kubeapf/kube_apf.go @@ -0,0 +1,264 @@ +package kubeapf + +import ( + "context" + "fmt" + "sync" + "time" + + kcpkubernetesinformers "github.com/kcp-dev/client-go/informers" + tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1" + "github.com/kcp-dev/kcp/pkg/client" + tenancyinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/tenancy/v1alpha1" + "github.com/kcp-dev/kcp/pkg/logging" + "github.com/kcp-dev/logicalcluster/v2" + flowcontrol "k8s.io/api/flowcontrol/v1beta2" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/server/mux" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +const KubeApfDelegatorName = "kcp-kube-apf-delegator" + +// KubeApfDelegator implements k8s APF controller interface +// it is cluster-aware and manages the life cycles of +// cluster-specific APF controller instances and delegates +// requests from the handler chain to them +type KubeApfDelegator struct { + // scopingInformerFactory is not cluster scoped but can be made cluster scoped + scopingSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory + // kubeCluster ClusterInterface can be used to get cluster scoped clientset + kubeCluster kubernetes.ClusterInterface + + // for now assume these are globl configurations for all logical clusters to inherit + serverConcurrencyLimit int + requestWaitLimit time.Duration + + cwQueue workqueue.RateLimitingInterface + + lock sync.RWMutex + // delegates are the references to cluster specific apf controllers + delegates map[logicalcluster.Name]utilflowcontrol.Interface + // delegateStopChs are cluster specific stopChs that can be used + // to stop single delegate when its corresponding ClusterWorkspace + // is removed + delegateStopChs map[logicalcluster.Name]chan struct{} + + pathRecorderMux *mux.PathRecorderMux + + // stopCh lets delegator receive stop signal from outside + stopCh <-chan struct{} + + utilflowcontrol.KcpWatchTracker + + getClusterWorkspace func(key string) (*tenancyv1alpha1.ClusterWorkspace, error) +} + +// Make sure utilflowcontrol.Interface is implemented +// var _ utilflowcontrol.Interface = &KubeApfDelegator{} + +// NewKubeApfDelegator +func NewKubeApfDelegator( + informerFactory kcpkubernetesinformers.SharedInformerFactory, + kubeCluster kubernetes.ClusterInterface, + clusterWorkspacesInformer tenancyinformers.ClusterWorkspaceInformer, + serverConcurrencyLimit int, + requestWaitLimit time.Duration, +) *KubeApfDelegator { + k := &KubeApfDelegator{ + scopingSharedInformerFactory: informerFactory, // not cluster scoped + kubeCluster: kubeCluster, // can be made cluster scoped + serverConcurrencyLimit: serverConcurrencyLimit, + requestWaitLimit: requestWaitLimit, + cwQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + delegates: map[logicalcluster.Name]utilflowcontrol.Interface{}, + delegateStopChs: map[logicalcluster.Name]chan struct{}{}, + KcpWatchTracker: utilflowcontrol.NewKcpWatchTracker(), + getClusterWorkspace: func(key string) (*tenancyv1alpha1.ClusterWorkspace, error) { + return clusterWorkspacesInformer.Lister().Get(key) + }, + } + clusterWorkspacesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: k.enqueueClusterWorkspace, + }) + return k +} + +// Handle implements flowcontrol.Interface +func (k *KubeApfDelegator) Handle(ctx context.Context, + requestDigest utilflowcontrol.RequestDigest, + noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func() fcrequest.WorkEstimate, + queueNoteFn fq.QueueNoteFn, + execFn func(), +) { + cluster, _ := genericapirequest.ValidClusterFrom(ctx) + klog.V(3).InfoS("KubeApfFilter Handle request for cluster ", "clusterName", cluster.Name) + + delegate, _ := k.getOrCreateDelegate(cluster.Name) + delegate.Handle(ctx, requestDigest, noteFn, workEstimator, queueNoteFn, execFn) +} + +// Install implements flowcontrol.Interface +func (k *KubeApfDelegator) Install(c *mux.PathRecorderMux) { + k.pathRecorderMux = c // store the reference for Install later // FIXME +} + +// MaintainObservations doesn't actually call MaintainObservations functions of delegates directly +// It stores the stopCh for later use +func (k *KubeApfDelegator) MaintainObservations(stopCh <-chan struct{}) { + k.lock.Lock() + if k.stopCh == nil { + k.stopCh = stopCh + } + k.lock.Unlock() + // Block waiting only so that it behaves similarly to cfgCtlr + <-stopCh +} + +// Run starts a goroutine to watch ClusterWorkspace deletions +func (k *KubeApfDelegator) Run(stopCh <-chan struct{}) error { + k.lock.Lock() + if k.stopCh == nil { + k.stopCh = stopCh + } + k.lock.Unlock() + + go wait.Until(k.runClusterWorkspaceWorker, time.Second, stopCh) + + <-stopCh + return nil +} + +// getOrCreateDelegate creates a utilflowcontrol.Interface (apf filter) for clusterName. +func (k *KubeApfDelegator) getOrCreateDelegate(clusterName logicalcluster.Name) (utilflowcontrol.Interface, error) { + k.lock.RLock() + delegate := k.delegates[clusterName] + k.lock.RUnlock() + + if delegate != nil { + return delegate, nil + } + + k.lock.Lock() + defer k.lock.Unlock() + + delegate = k.delegates[clusterName] + if delegate != nil { + return delegate, nil + } + + delegateStopCh := make(chan struct{}) + go func() { + select { + case <-k.stopCh: + close(delegateStopCh) + case <-delegateStopCh: + } + }() + + // New delegate uses cluster scoped informer factory and flowcontrol clients + scopedFlowSchemaInformer := k.scopingSharedInformerFactory.Flowcontrol().V1beta2().FlowSchemas().Cluster(clusterName) + scopedPriorityLevelConfigurationInformer := k.scopingSharedInformerFactory.Flowcontrol().V1beta2().PriorityLevelConfigurations().Cluster(clusterName) + + flowcontrolClient := k.kubeCluster.Cluster(clusterName).FlowcontrolV1beta2() + delegate = utilflowcontrol.New( + scopedFlowSchemaInformer, + scopedPriorityLevelConfigurationInformer, + flowcontrolClient, + k.serverConcurrencyLimit, + k.requestWaitLimit, + ) + // scopedInformerFactory.Start(delegateStopCh) + k.delegates[clusterName] = delegate + k.delegateStopChs[clusterName] = delegateStopCh + // TODO: can Unlock here? + + // Run cluster specific apf controller + go delegate.MaintainObservations(delegateStopCh) // FIXME: Metric observations need to work per-cluster --> beware of metrics explosion + go delegate.Run(delegateStopCh) + + // TODO: need to install per-cluster debug endpoint + delegate.Install(k.pathRecorderMux) // FIXME: this is nil + + klog.V(3).InfoS("Started new apf controller for cluster", "clusterName", clusterName) + return delegate, nil +} + +func (k *KubeApfDelegator) enqueueClusterWorkspace(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + logger := logging.WithQueueKey(logging.WithReconciler(klog.Background(), KubeApfDelegatorName), key) + logger.V(2).Info("queueing ClusterWorkspace") + k.cwQueue.Add(key) +} + +func (k *KubeApfDelegator) runClusterWorkspaceWorker() { + for k.processNext(k.cwQueue, k.processClusterWorkspace) { + } +} + +func (c *KubeApfDelegator) processNext( + queue workqueue.RateLimitingInterface, + processFunc func(key string) error, +) bool { + // Wait until there is a new item in the working queue + k, quit := queue.Get() + if quit { + return false + } + key := k.(string) + + // No matter what, tell the queue we're done with this key, to unblock + // other workers. + defer queue.Done(key) + + if err := processFunc(key); err != nil { + runtime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", KubeApfDelegatorName, key, err)) + queue.AddRateLimited(key) + return true + } + queue.Forget(key) + return true +} + +func (k *KubeApfDelegator) processClusterWorkspace(key string) error { + // e.g. root:orgws + parent, name := client.SplitClusterAwareKey(key) + + // turn it into root:org:ws + clusterName := parent.Join(name) + _, err := k.getClusterWorkspace(key) + if err != nil { + if kerrors.IsNotFound(err) { + k.stopAndRemoveDelegate(clusterName) + return nil + } + return err + } + return nil +} + +func (k *KubeApfDelegator) stopAndRemoveDelegate(cluster logicalcluster.Name) { + k.lock.Lock() + defer k.lock.Unlock() + + if stopCh, ok := k.delegateStopChs[cluster]; ok { + close(stopCh) + delete(k.delegateStopChs, cluster) + } + + delete(k.delegates, cluster) +} diff --git a/pkg/reconciler/soct/controller.go b/pkg/reconciler/soct/controller.go new file mode 100644 index 00000000000..77aaf455bff --- /dev/null +++ b/pkg/reconciler/soct/controller.go @@ -0,0 +1,222 @@ +package soct + +import ( + "context" + "fmt" + "time" + + tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1" + "github.com/kcp-dev/kcp/pkg/client" + tenancyinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/tenancy/v1alpha1" + "github.com/kcp-dev/kcp/pkg/informer" + "github.com/kcp-dev/kcp/pkg/logging" + "github.com/kcp-dev/logicalcluster/v2" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" + + // kubernetesclient "k8s.io/client-go/kubernetes" + kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +const SOCTControllerName = "kcp-storage-object-count-tracker-controller" + +// SOCTController monitors ClusterWorkspaces, APIBindings and CRDs and creates or deletes +// cluster-scoped storage object count observer goroutines and trackers accordingly. +// It contains a registry for Getter funcs of all resource types that will be used by +// observer goroutines to udpate the tracker object counts. It also acts as a multiplexer +// for API request-driven queries to retrieve the latest resource object count numbers. +type SOCTController struct { + dynamicDiscoverySharedInformerFactory *informer.DynamicDiscoverySharedInformerFactory + cwQueue workqueue.RateLimitingInterface + getterRegistry flowcontrolrequest.StorageObjectCountGetterRegistry + tracker flowcontrolrequest.KcpStorageObjectCountTracker + + getClusterWorkspace func(key string) (*tenancyv1alpha1.ClusterWorkspace, error) +} + +// NewSOCTController +func NewSOCTController( + kubeClusterClient *kcpkubernetesclientset.ClusterClientset, // FIXME: unused? + clusterWorkspacesInformer tenancyinformers.ClusterWorkspaceInformer, + dynamicDiscoverySharedInformerFactory *informer.DynamicDiscoverySharedInformerFactory, + getterRegistry flowcontrolrequest.StorageObjectCountGetterRegistry, + tracker flowcontrolrequest.KcpStorageObjectCountTracker, +) *SOCTController { + c := &SOCTController{ + dynamicDiscoverySharedInformerFactory: dynamicDiscoverySharedInformerFactory, + cwQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + getterRegistry: getterRegistry, + tracker: tracker, + getClusterWorkspace: func(key string) (*tenancyv1alpha1.ClusterWorkspace, error) { + return clusterWorkspacesInformer.Lister().Get(key) + }, + } + + clusterWorkspacesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueClusterWorkspace, + DeleteFunc: c.enqueueClusterWorkspace, + }) + return c +} + +func (c *SOCTController) enqueueClusterWorkspace(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + logger := logging.WithQueueKey(logging.WithReconciler(klog.Background(), SOCTControllerName), key) + logger.V(2).Info("queueing ClusterWorkspace") + c.cwQueue.Add(key) +} + +// Run starts the SOCT controller. It needs to start updating +// counters in trackers for APF to work +func (c *SOCTController) Run(ctx context.Context) { + defer runtime.HandleCrash() + defer c.cwQueue.ShutDown() + + logger := logging.WithReconciler(klog.FromContext(ctx), SOCTControllerName) + ctx = klog.NewContext(ctx, logger) + logger.Info("Starting controller") + defer logger.Info("Shutting down controller") + + // Start trackers of default clusters + defaultClusters := []logicalcluster.Name{ + logicalcluster.New("root"), + logicalcluster.New("system:system-crds"), + logicalcluster.New("system:shard"), + logicalcluster.New("system:admin"), + logicalcluster.New("system:bound-crds"), + logicalcluster.New("root:compute"), + } + for _, cluster := range defaultClusters { + logger.Info("Starting storage object count tracker for logical cluster", "clusterName", cluster) + c.startClusterWorkspaceTracker(ctx, cluster) + defer c.stopClusterWorkspaceTracker(ctx, cluster) + } + + go wait.UntilWithContext(ctx, c.runClusterWorkspaceWorker, time.Second) + + <-ctx.Done() +} + +func (c *SOCTController) runClusterWorkspaceWorker(ctx context.Context) { + for c.processNext(ctx, c.cwQueue, c.processClusterWorkspace) { + } +} + +func (c *SOCTController) processNext( + ctx context.Context, + queue workqueue.RateLimitingInterface, + processFunc func(ctx context.Context, key string) error, +) bool { + // Wait until there is a new item in the working queue + k, quit := queue.Get() + if quit { + return false + } + key := k.(string) + + logger := logging.WithQueueKey(klog.FromContext(ctx), key) + ctx = klog.NewContext(ctx, logger) + logger.V(1).Info("SOCT-processing-key") + + // No matter what, tell the queue we're done with this key, to unblock + // other workers. + defer queue.Done(key) + + if err := processFunc(ctx, key); err != nil { + runtime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", SOCTControllerName, key, err)) + queue.AddRateLimited(key) + return true + } + queue.Forget(key) + return true +} + +func (c *SOCTController) processClusterWorkspace(ctx context.Context, key string) error { + logger := klog.FromContext(ctx) + // e.g. root:orgws + parent, name := client.SplitClusterAwareKey(key) + + // turn it into root:org:ws + clusterName := parent.Join(name) + ws, err := c.getClusterWorkspace(key) + logger = logger.WithValues("logicalCluster", clusterName.String()) + logger.V(2).Info("processClusterWorkspace called") + if err != nil { + if kerrors.IsNotFound(err) { + logger.V(2).Info("ClusterWorkspace not found - deleting tracker") + c.stopClusterWorkspaceTracker(ctx, clusterName) + return nil + } + return err + } + logger = logging.WithObject(logger, ws) + c.startClusterWorkspaceTracker(ctx, clusterName) + logger.V(2).Info("Cluster tracker started") + + return nil +} + +func (c *SOCTController) startClusterWorkspaceTracker(ctx context.Context, clusterName logicalcluster.Name) { + clusterNameStr := clusterName.String() + + // Pass the global context in for the cluster specific tracker created to catch global ctx.Done + // so that the pruning and the observer goroutines can be gracefully terminated if global context + // is cancelled + c.tracker.CreateClusterSpecificTracker(ctx, clusterNameStr) + + // Start a goroutine to subscribe to changes in API + apisChanged := c.dynamicDiscoverySharedInformerFactory.Subscribe("soct-" + clusterNameStr) + logger := klog.FromContext(ctx) + logger = logger.WithValues("logicalCluster", clusterName.String()) + + go func() { + var discoveryCancel func() + + for { + select { + case <-ctx.Done(): + return + case _, more := <-apisChanged: + logger.V(2).Info("apisChanged TRIGGERED -- RESETTING Observers") + if discoveryCancel != nil { + discoveryCancel() + if !more { // FIXME: Use cluster specific context instead of relying on apisChanged channel? + return + } + } + apisChangedCtx, cancelFunc := context.WithCancel(ctx) + discoveryCancel = cancelFunc + + listers, notSynced := c.dynamicDiscoverySharedInformerFactory.Listers() + + for gvr := range listers { + resourceName := gvr.GroupResource().String() + c.tracker.EnsureObserving(apisChangedCtx, clusterNameStr, resourceName, + func() int64 { return c.getterRegistry.GetObjectCount(clusterName, resourceName) }, + ) + } + for _, gvr := range notSynced { // TODO: confirm with Andy and Steve whether this is needed + resourceName := gvr.GroupResource().String() + c.tracker.EnsureObserving(apisChangedCtx, clusterNameStr, resourceName, + func() int64 { return c.getterRegistry.GetObjectCount(clusterName, resourceName) }, + ) + } + } + } + }() +} + +func (c *SOCTController) stopClusterWorkspaceTracker(ctx context.Context, clusterName logicalcluster.Name) { + clusterNameStr := clusterName.String() + c.dynamicDiscoverySharedInformerFactory.Unsubscribe("soct-" + clusterNameStr) + c.tracker.DeleteClusterSpecificTracker(clusterNameStr) +} diff --git a/pkg/server/config.go b/pkg/server/config.go index 3c8eb272079..a46f901b330 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -62,10 +62,13 @@ import ( "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/server/bootstrap" kcpfilters "github.com/kcp-dev/kcp/pkg/server/filters" + "github.com/kcp-dev/kcp/pkg/reconciler/kubeapf" kcpserveroptions "github.com/kcp-dev/kcp/pkg/server/options" "github.com/kcp-dev/kcp/pkg/server/options/batteries" "github.com/kcp-dev/kcp/pkg/server/requestinfo" "github.com/kcp-dev/kcp/pkg/tunneler" + genericfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" ) type Config struct { @@ -263,6 +266,19 @@ func NewConfig(opts *kcpserveroptions.CompletedOptions) (*Config, error) { kcpinformers.WithExtraClusterScopedIndexers(indexers.ClusterScoped()), kcpinformers.WithExtraNamespaceScopedIndexers(indexers.NamespaceScoped()), ) + + // Call to BuildPriorityAndFairness in kcp instead of kube + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && c.Options.GenericControlPlane.GenericServerRunOptions.EnablePriorityAndFairness { + c.GenericConfig.FlowControl = kubeapf.NewKubeApfDelegator( + c.KubeSharedInformerFactory, + c.KubeClusterClient, + c.KcpSharedInformerFactory.Tenancy().V1alpha1().ClusterWorkspaces(), + 1600, + c.Options.GenericControlPlane.GenericServerRunOptions.RequestTimeout/4, + ) + + } + c.DeepSARClient, err = kcpkubernetesclientset.NewForConfig(authorization.WithDeepSARConfig(rest.CopyConfig(c.GenericConfig.LoopbackClientConfig))) if err != nil { return nil, err diff --git a/pkg/server/controllers.go b/pkg/server/controllers.go index cb1b28420cb..1387f52c991 100644 --- a/pkg/server/controllers.go +++ b/pkg/server/controllers.go @@ -67,6 +67,7 @@ import ( "github.com/kcp-dev/kcp/pkg/reconciler/kubequota" schedulinglocationstatus "github.com/kcp-dev/kcp/pkg/reconciler/scheduling/location" schedulingplacement "github.com/kcp-dev/kcp/pkg/reconciler/scheduling/placement" + "github.com/kcp-dev/kcp/pkg/reconciler/soct" "github.com/kcp-dev/kcp/pkg/reconciler/tenancy/bootstrap" "github.com/kcp-dev/kcp/pkg/reconciler/tenancy/clusterworkspace" "github.com/kcp-dev/kcp/pkg/reconciler/tenancy/clusterworkspacedeletion" @@ -1181,6 +1182,46 @@ func (s *Server) installKubeQuotaController( return nil } +func (s *Server) installKcpSOCTController( + ctx context.Context, + config *rest.Config, + server *genericapiserver.GenericAPIServer, +) error { + controllerName := "kcp-storage-object-count-tracker-controller" + config = rest.CopyConfig(config) + + config = rest.AddUserAgent(config, controllerName) + kubeClusterClient, err := kcpkubernetesclientset.NewForConfig(config) + if err != nil { + return err + } + + c := soct.NewSOCTController( + kubeClusterClient, + s.KcpSharedInformerFactory.Tenancy().V1alpha1().ClusterWorkspaces(), + s.DynamicDiscoverySharedInformerFactory, + s.GenericConfig.StorageObjectCountGetterRegistry, + s.GenericConfig.KcpStorageObjectCountTracker, + ) + + if err := server.AddPostStartHook(postStartHookName(controllerName), func(hookContext genericapiserver.PostStartHookContext) error { + logger := klog.FromContext(ctx).WithValues("postStartHook", postStartHookName(controllerName)) + if err := s.waitForSync(hookContext.StopCh); err != nil { + logger.Error(err, "failed to finish post-start-hook") + // nolint:nilerr + return nil // don't klog.Fatal. This only happens when context is cancelled. + } + + go c.Run(goContext(hookContext)) + + return nil + }); err != nil { + return err + } + + return nil +} + func (s *Server) installApiExportIdentityController(ctx context.Context, config *rest.Config, server *genericapiserver.GenericAPIServer) error { if s.Options.Extra.ShardName == tenancyv1alpha1.RootShard { return nil diff --git a/pkg/server/server.go b/pkg/server/server.go index 7a56cf97b0d..bcbc5446a3a 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -519,6 +519,12 @@ func (s *Server) Run(ctx context.Context) error { } } + if s.Options.Controllers.EnableAll { + if err := s.installKcpSOCTController(ctx, controllerConfig, delegationChainHead); err != nil { + return err + } + } + if s.Options.Virtual.Enabled { if err := s.installVirtualWorkspaces(ctx, controllerConfig, delegationChainHead, s.GenericConfig.Authentication, s.GenericConfig.ExternalAddress, s.GenericConfig.AuditPolicyRuleEvaluator, s.preHandlerChainMux); err != nil { return err