From 7822d9c572cb005cf5ff4137906337496f446c5c Mon Sep 17 00:00:00 2001 From: chenpu <1102509144@163.com> Date: Fri, 29 Mar 2024 15:36:48 +0800 Subject: [PATCH] fix: add finalizer to prevent volume leakage --- cmd/csi-provisioner/csi-provisioner.go | 39 ++-- pkg/controller/controller.go | 57 +++++- pkg/controller/provisioning_controller.go | 174 ++++++++++++++++++ .../provisioning_controller_test.go | 97 ++++++++++ 4 files changed, 351 insertions(+), 16 deletions(-) create mode 100644 pkg/controller/provisioning_controller.go create mode 100644 pkg/controller/provisioning_controller_test.go diff --git a/cmd/csi-provisioner/csi-provisioner.go b/cmd/csi-provisioner/csi-provisioner.go index 23e4e3e4fc..b9669c835b 100644 --- a/cmd/csi-provisioner/csi-provisioner.go +++ b/cmd/csi-provisioner/csi-provisioner.go @@ -70,18 +70,21 @@ import ( ) var ( - master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.") - kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.") - csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume.") - volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.") - volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.") - showVersion = flag.Bool("version", false, "Show version.") - retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.") - retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.") - workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.") - finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaneously running threads, handling cloning finalizer removal") - capacityThreads = flag.Uint("capacity-threads", 1, "Number of simultaneously running threads, handling CSIStorageCapacity objects") - operationTimeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for volume operation (creation, deletion, capacity queries)") + master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.") + kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.") + csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume.") + volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.") + volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.") + showVersion = flag.Bool("version", false, "Show version.") + retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.") + retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.") + workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.") + finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaneously running threads, handling cloning finalizer removal") + provisioningFinalizerThreads = flag.Uint("provisioning-protection-threads", 1, "Number of simultaneously running threads, handling provisioning finalizer removal") + capacityThreads = flag.Uint("capacity-threads", 1, "Number of simultaneously running threads, handling CSIStorageCapacity objects") + operationTimeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for volume operation (creation, deletion, capacity queries)") + + provisioningFinalizerThreads = flag.Uint("provisioning-protection-threads", 1, "Number of simultaneously running threads, handling provisioning finalizer removal") enableLeaderElection = flag.Bool("leader-election", false, "Enables leader election. If leader election is enabled, additional RBAC rules are required. Please refer to the Kubernetes CSI documentation for instructions on setting up these RBAC rules.") @@ -378,6 +381,8 @@ func main() { // PersistentVolumeClaims informer rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax) claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims") + provisoningRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax) + provisoningClaimQueue := workqueue.NewNamedRateLimitingQueue(provisoningRateLimiter, "provisoning-protection") claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer() // Setup options @@ -568,6 +573,13 @@ func main() { controllerCapabilities, ) + provisioningProtectionController := ctrl.NewProvisioningProtectionController( + clientset, + claimLister, + claimInformer, + provisoningClaimQueue, + ) + // Start HTTP server, regardless whether we are the leader or not. if addr != "" { // To collect metrics data from the metric handler itself, we @@ -643,6 +655,9 @@ func main() { if csiClaimController != nil { go csiClaimController.Run(ctx, int(*finalizerThreads)) } + if provisioningProtectionController != nil { + go provisioningProtectionController.Run(ctx, int(*provisioningFinalizerThreads)) + } provisionController.Run(ctx) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9113474d1e..0d1f2a916c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -135,9 +135,10 @@ const ( annMigratedTo = "pv.kubernetes.io/migrated-to" // TODO: Beta will be deprecated and removed in a later release - annBetaStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner" - annStorageProvisioner = "volume.kubernetes.io/storage-provisioner" - annSelectedNode = "volume.kubernetes.io/selected-node" + annBetaStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner" + annStorageProvisioner = "volume.kubernetes.io/storage-provisioner" + annSelectedNode = "volume.kubernetes.io/selected-node" + annProvisioningConsistency = "volume.kubernetes.io/provisioning-consistency" // Annotation for secret name and namespace will be added to the pv object // and used at pvc deletion time. @@ -146,7 +147,8 @@ const ( snapshotNotBound = "snapshot %s not bound" - pvcCloneFinalizer = "provisioner.storage.kubernetes.io/cloning-protection" + pvcCloneFinalizer = "provisioner.storage.kubernetes.io/cloning-protection" + pvcProvisioningFinalizer = "provisioner.storage.kubernetes.io/provisioning-protection" annAllowVolumeModeChange = "snapshot.storage.kubernetes.io/allow-volume-mode-change" ) @@ -801,6 +803,13 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi } } + // If provisioningconsistency disable and pvc is being deleted, remove finalizer and abandon provisioning process. A CSI driver may be provisioning the volume and this volume may leak. It should be documented so. + if options.PVC.DeletionTimestamp != nil && + options.PVC.Annotations != nil && options.PVC.Annotations[annProvisioningConsistency] == "disable" { + err := p.removeProvisioningFinalizer(ctx, options.PVC) + return nil, controller.ProvisioningFinished, err + } + // The same check already ran in ShouldProvision, but perhaps // it couldn't complete due to some unexpected error. owned, err := p.checkNode(ctx, claim, options.StorageClass, "provision") @@ -823,6 +832,14 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi pvName := req.Name provisionerCredentials := req.Secrets + // Add finalizer before createvolume + if options.PVC.Annotations != nil && options.PVC.Annotations[annProvisioningConsistency] == "enable" { + err = p.setProvisioningFinalizer(ctx, options.PVC) + if err != nil { + return nil, controller.ProvisioningNoChange, err + } + } + createCtx := markAsMigrated(ctx, result.migratedVolume) createCtx, cancel := context.WithTimeout(createCtx, p.timeout) defer cancel() @@ -992,6 +1009,38 @@ func (p *csiProvisioner) setCloneFinalizer(ctx context.Context, pvc *v1.Persiste return nil } +func (p *csiProvisioner) setProvisioningFinalizer(ctx context.Context, claim *v1.PersistentVolumeClaim) (err error) { + clone := claim.DeepCopy() + if !checkFinalizer(clone, pvcProvisioningFinalizer) { + clone.Finalizers = append(clone.Finalizers, pvcProvisioningFinalizer) + claim, err = p.client.CoreV1().PersistentVolumeClaims(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) + klog.V(5).Infof("successfully set PVC ProvisioningFinalizer %v", claim.Name) + return err + } + + return nil +} + +func (p *csiProvisioner) removeProvisioningFinalizer(ctx context.Context, claim *v1.PersistentVolumeClaim) (err error) { + clone := claim.DeepCopy() + if !checkFinalizer(clone, pvcProvisioningFinalizer) { + return nil + } + + newFinalizers := make([]string, 0) + for _, f := range clone.GetFinalizers() { + if f == pvcProvisioningFinalizer { + continue + } + newFinalizers = append(newFinalizers, f) + } + + clone.Finalizers = newFinalizers + claim, err = p.client.CoreV1().PersistentVolumeClaims(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) + klog.V(5).Infof("successfully remove PVC ProvisioningFinalizer %v", claim.Name) + return err +} + func (p *csiProvisioner) supportsTopology() bool { return SupportsTopology(p.pluginCapabilities) } diff --git a/pkg/controller/provisioning_controller.go b/pkg/controller/provisioning_controller.go new file mode 100644 index 0000000000..a295f51cb1 --- /dev/null +++ b/pkg/controller/provisioning_controller.go @@ -0,0 +1,174 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "time" + + v1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "sigs.k8s.io/sig-storage-lib-external-provisioner/v9/controller" +) + +type ProvisioningProtectionController struct { + client kubernetes.Interface + claimLister corelisters.PersistentVolumeClaimLister + claimInformer cache.SharedInformer + claimQueue workqueue.RateLimitingInterface +} + +// NewProvisioningProtectionController creates new controller for additional CSI claim protection capabilities +func NewProvisioningProtectionController( + client kubernetes.Interface, + claimLister corelisters.PersistentVolumeClaimLister, + claimInformer cache.SharedInformer, + claimQueue workqueue.RateLimitingInterface, +) *ProvisioningProtectionController { + controller := &ProvisioningProtectionController{ + client: client, + claimLister: claimLister, + claimInformer: claimInformer, + claimQueue: claimQueue, + } + return controller +} + +// Run is a main ProvisioningProtectionController handler +func (p *ProvisioningProtectionController) Run(ctx context.Context, threadiness int) { + klog.Info("Starting ProvisioningProtection controller") + defer utilruntime.HandleCrash() + defer p.claimQueue.ShutDown() + + claimHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { p.enqueueClaimUpdate(obj) }, + UpdateFunc: func(_ interface{}, newObj interface{}) { p.enqueueClaimUpdate(newObj) }, + } + p.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.DefaultResyncPeriod) + + for i := 0; i < threadiness; i++ { + go wait.Until(func() { + p.runClaimWorker(ctx) + }, time.Second, ctx.Done()) + } + + klog.Infof("Started ProvisioningProtection controller") + <-ctx.Done() + klog.Info("Shutting down ProvisioningProtection controller") +} + +func (p *ProvisioningProtectionController) runClaimWorker(ctx context.Context) { + for p.processNextClaimWorkItem(ctx) { + } +} + +// processNextClaimWorkItem processes items from claimQueue +func (p *ProvisioningProtectionController) processNextClaimWorkItem(ctx context.Context) bool { + obj, shutdown := p.claimQueue.Get() + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer p.claimQueue.Done(obj) + var key string + var ok bool + if key, ok = obj.(string); !ok { + p.claimQueue.Forget(obj) + return fmt.Errorf("expected string in workqueue but got %#v", obj) + } + + if err := p.syncClaimHandler(ctx, key); err != nil { + klog.Warningf("Retrying syncing claim %q after %v failures", key, p.claimQueue.NumRequeues(obj)) + p.claimQueue.AddRateLimited(obj) + } else { + p.claimQueue.Forget(obj) + } + + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + + return true +} + +// enqueueClaimUpdate takes a PVC obj and stores it into the claim work queue. +func (p *ProvisioningProtectionController) enqueueClaimUpdate(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + utilruntime.HandleError(err) + return + } + + p.claimQueue.Add(key) +} + +// syncClaimHandler gets the claim from informer's cache then calls syncClaim +func (p *ProvisioningProtectionController) syncClaimHandler(ctx context.Context, key string) error { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + claim, err := p.claimLister.PersistentVolumeClaims(namespace).Get(name) + if err != nil { + if apierrs.IsNotFound(err) { + utilruntime.HandleError(fmt.Errorf("item '%s' in work queue no longer exists", key)) + return nil + } + + return err + } + + return p.syncClaim(ctx, claim) +} + +// syncClaim removes finalizers from a PVC, when provision is finished +func (p *ProvisioningProtectionController) syncClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error { + if !checkFinalizer(claim, pvcProvisioningFinalizer) || claim.Spec.VolumeName == "" { + return nil + } + + // Remove provision finalizer + finalizers := make([]string, 0) + for _, finalizer := range claim.ObjectMeta.Finalizers { + if finalizer != pvcProvisioningFinalizer { + finalizers = append(finalizers, finalizer) + } + } + + clone := claim.DeepCopy() + clone.Finalizers = finalizers + if _, err := p.client.CoreV1().PersistentVolumeClaims(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}); err != nil { + return err + } + return nil +} diff --git a/pkg/controller/provisioning_controller_test.go b/pkg/controller/provisioning_controller_test.go new file mode 100644 index 0000000000..f6412dd30a --- /dev/null +++ b/pkg/controller/provisioning_controller_test.go @@ -0,0 +1,97 @@ +package controller + +import ( + "context" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/informers" + fakeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/util/workqueue" +) + +func provisioningBaseClaim(pvName string) *v1.PersistentVolumeClaim { + return fakeClaim("test", "test", "fake-claim-uid", 1000, pvName, v1.ClaimBound, &fakeSc1, "").DeepCopy() +} + +func TestProvisioningFinalizerRemoval(t *testing.T) { + testcases := map[string]struct { + claim runtime.Object + expectFinalizer bool + expectError error + }{ + "provisioning pvc without volumeName": { + claim: pvcFinalizers(provisioningBaseClaim(""), pvcProvisioningFinalizer), + expectFinalizer: true, + }, + "pvc without provisioning finalizer": { + claim: baseClaim(), + expectFinalizer: false, + }, + "provisioning pvc with volumeName": { + claim: pvcFinalizers(provisioningBaseClaim("test"), pvcProvisioningFinalizer), + expectFinalizer: false, + }, + } + + for k, tc := range testcases { + tc := tc + t.Run(k, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + + clientSet := fakeclientset.NewSimpleClientset(tc.claim) + provisioningProtector := fakeProvisioningProtector(clientSet, tc.claim) + + claim := tc.claim.(*v1.PersistentVolumeClaim) + err := provisioningProtector.syncClaim(ctx, claim) + + // Get updated claim after reconcile finish + claim, _ = clientSet.CoreV1().PersistentVolumeClaims(claim.Namespace).Get(ctx, claim.Name, metav1.GetOptions{}) + + // Check finalizers removal + if tc.expectFinalizer && !checkFinalizer(claim, pvcProvisioningFinalizer) { + t.Errorf("Claim finalizer was expected to be found on: %s", claim.Name) + } else if !tc.expectFinalizer && checkFinalizer(claim, pvcProvisioningFinalizer) { + t.Errorf("Claim finalizer was not expected to be found on: %s", claim.Name) + } + + if tc.expectError == nil && err != nil { + t.Errorf("Caught an unexpected error during 'syncClaim' run: %s", err) + } else if tc.expectError != nil && err == nil { + t.Errorf("Expected error during 'syncClaim' run, got nil: %s", tc.expectError) + } else if tc.expectError != nil && err != nil && tc.expectError.Error() != err.Error() { + t.Errorf("Unexpected error during 'syncClaim' run:\n\t%s\nExpected:\n\t%s", err, tc.expectError) + } + }) + } + +} + +func fakeProvisioningProtector(client *fakeclientset.Clientset, objects ...runtime.Object) *ProvisioningProtectionController { + utilruntime.ReallyCrash = false + + informerFactory := informers.NewSharedInformerFactory(client, 1*time.Second) + claimInformer := informerFactory.Core().V1().PersistentVolumeClaims().Informer() + claimLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() + rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 2*time.Second) + claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "provisioning-protection") + + for _, claim := range objects { + claimInformer.GetStore().Add(claim) + } + + informerFactory.WaitForCacheSync(context.TODO().Done()) + go informerFactory.Start(context.TODO().Done()) + + return NewProvisioningProtectionController( + client, + claimLister, + claimInformer, + claimQueue, + ) +}