Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add finalizer to prevent volume leakage #1179

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,21 @@ import (
)

var (
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume.")
volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.")
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
showVersion = flag.Bool("version", false, "Show version.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.")
workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.")
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaneously running threads, handling cloning finalizer removal")
capacityThreads = flag.Uint("capacity-threads", 1, "Number of simultaneously running threads, handling CSIStorageCapacity objects")
operationTimeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for volume operation (creation, deletion, capacity queries)")
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume.")
volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.")
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
showVersion = flag.Bool("version", false, "Show version.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.")
workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.")
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaneously running threads, handling cloning finalizer removal")
provisioningFinalizerThreads = flag.Uint("provisioning-protection-threads", 1, "Number of simultaneously running threads, handling provisioning finalizer removal")
capacityThreads = flag.Uint("capacity-threads", 1, "Number of simultaneously running threads, handling CSIStorageCapacity objects")
operationTimeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for volume operation (creation, deletion, capacity queries)")

provisioningFinalizerThreads = flag.Uint("provisioning-protection-threads", 1, "Number of simultaneously running threads, handling provisioning finalizer removal")

enableLeaderElection = flag.Bool("leader-election", false, "Enables leader election. If leader election is enabled, additional RBAC rules are required. Please refer to the Kubernetes CSI documentation for instructions on setting up these RBAC rules.")

Expand Down Expand Up @@ -378,6 +381,8 @@ func main() {
// PersistentVolumeClaims informer
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
provisoningRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
provisoningClaimQueue := workqueue.NewNamedRateLimitingQueue(provisoningRateLimiter, "provisoning-protection")
claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()

// Setup options
Expand Down Expand Up @@ -568,6 +573,13 @@ func main() {
controllerCapabilities,
)

provisioningProtectionController := ctrl.NewProvisioningProtectionController(
clientset,
claimLister,
claimInformer,
provisoningClaimQueue,
)

// Start HTTP server, regardless whether we are the leader or not.
if addr != "" {
// To collect metrics data from the metric handler itself, we
Expand Down Expand Up @@ -643,6 +655,9 @@ func main() {
if csiClaimController != nil {
go csiClaimController.Run(ctx, int(*finalizerThreads))
}
if provisioningProtectionController != nil {
go provisioningProtectionController.Run(ctx, int(*provisioningFinalizerThreads))
}
provisionController.Run(ctx)
}

Expand Down
60 changes: 56 additions & 4 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ const (

annMigratedTo = "pv.kubernetes.io/migrated-to"
// TODO: Beta will be deprecated and removed in a later release
annBetaStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner"
annStorageProvisioner = "volume.kubernetes.io/storage-provisioner"
annSelectedNode = "volume.kubernetes.io/selected-node"
annBetaStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner"
annStorageProvisioner = "volume.kubernetes.io/storage-provisioner"
annSelectedNode = "volume.kubernetes.io/selected-node"
annProvisioningConsistency = "volume.kubernetes.io/provisioning-consistency"

// Annotation for secret name and namespace will be added to the pv object
// and used at pvc deletion time.
Expand All @@ -146,7 +147,8 @@ const (

snapshotNotBound = "snapshot %s not bound"

pvcCloneFinalizer = "provisioner.storage.kubernetes.io/cloning-protection"
pvcCloneFinalizer = "provisioner.storage.kubernetes.io/cloning-protection"
pvcProvisioningFinalizer = "provisioner.storage.kubernetes.io/provisioning-protection"

annAllowVolumeModeChange = "snapshot.storage.kubernetes.io/allow-volume-mode-change"
)
Expand Down Expand Up @@ -801,6 +803,16 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi
}
}

// If provisioningconsistency disable and pvc is being deleted, remove finalizer and abandon provisioning process. A CSI driver may be provisioning the volume and this volume may leak.
if options.PVC.DeletionTimestamp != nil &&
options.PVC.Annotations != nil && options.PVC.Annotations[annProvisioningConsistency] == "disable" {
err := p.removeProvisioningFinalizer(ctx, options.PVC)
if err != nil {
return nil, controller.ProvisioningNoChange, err
}
return nil, controller.ProvisioningFinished, err
}

// The same check already ran in ShouldProvision, but perhaps
// it couldn't complete due to some unexpected error.
owned, err := p.checkNode(ctx, claim, options.StorageClass, "provision")
Expand All @@ -823,6 +835,14 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi
pvName := req.Name
provisionerCredentials := req.Secrets

// Add finalizer before createvolume
if options.PVC.Annotations != nil && options.PVC.Annotations[annProvisioningConsistency] == "enable" {
err = p.setProvisioningFinalizer(ctx, options.PVC)
if err != nil {
return nil, controller.ProvisioningNoChange, err
}
}

createCtx := markAsMigrated(ctx, result.migratedVolume)
createCtx, cancel := context.WithTimeout(createCtx, p.timeout)
defer cancel()
Expand Down Expand Up @@ -992,6 +1012,38 @@ func (p *csiProvisioner) setCloneFinalizer(ctx context.Context, pvc *v1.Persiste
return nil
}

func (p *csiProvisioner) setProvisioningFinalizer(ctx context.Context, claim *v1.PersistentVolumeClaim) (err error) {
clone := claim.DeepCopy()
if !checkFinalizer(clone, pvcProvisioningFinalizer) {
clone.Finalizers = append(clone.Finalizers, pvcProvisioningFinalizer)
claim, err = p.client.CoreV1().PersistentVolumeClaims(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
klog.V(5).Infof("successfully set PVC ProvisioningFinalizer %v", claim.Name)
return err
}

return nil
}

func (p *csiProvisioner) removeProvisioningFinalizer(ctx context.Context, claim *v1.PersistentVolumeClaim) (err error) {
clone := claim.DeepCopy()
if !checkFinalizer(clone, pvcProvisioningFinalizer) {
return nil
}

newFinalizers := make([]string, 0)
for _, f := range clone.GetFinalizers() {
if f == pvcProvisioningFinalizer {
continue
}
newFinalizers = append(newFinalizers, f)
}

clone.Finalizers = newFinalizers
claim, err = p.client.CoreV1().PersistentVolumeClaims(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
klog.V(5).Infof("successfully remove PVC ProvisioningFinalizer %v", claim.Name)
return err
}

func (p *csiProvisioner) supportsTopology() bool {
return SupportsTopology(p.pluginCapabilities)
}
Expand Down
174 changes: 174 additions & 0 deletions pkg/controller/provisioning_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"fmt"
"time"

v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/sig-storage-lib-external-provisioner/v9/controller"
)

type ProvisioningProtectionController struct {
client kubernetes.Interface
claimLister corelisters.PersistentVolumeClaimLister
claimInformer cache.SharedInformer
claimQueue workqueue.RateLimitingInterface
}

// NewProvisioningProtectionController creates new controller for additional CSI claim protection capabilities
func NewProvisioningProtectionController(
client kubernetes.Interface,
claimLister corelisters.PersistentVolumeClaimLister,
claimInformer cache.SharedInformer,
claimQueue workqueue.RateLimitingInterface,
) *ProvisioningProtectionController {
controller := &ProvisioningProtectionController{
client: client,
claimLister: claimLister,
claimInformer: claimInformer,
claimQueue: claimQueue,
}
return controller
}

// Run is a main ProvisioningProtectionController handler
func (p *ProvisioningProtectionController) Run(ctx context.Context, threadiness int) {
klog.Info("Starting ProvisioningProtection controller")
defer utilruntime.HandleCrash()
defer p.claimQueue.ShutDown()

claimHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { p.enqueueClaimUpdate(obj) },
UpdateFunc: func(_ interface{}, newObj interface{}) { p.enqueueClaimUpdate(newObj) },
}
p.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.DefaultResyncPeriod)

for i := 0; i < threadiness; i++ {
go wait.Until(func() {
p.runClaimWorker(ctx)
}, time.Second, ctx.Done())
}

klog.Infof("Started ProvisioningProtection controller")
<-ctx.Done()
klog.Info("Shutting down ProvisioningProtection controller")
}

func (p *ProvisioningProtectionController) runClaimWorker(ctx context.Context) {
for p.processNextClaimWorkItem(ctx) {
}
}

// processNextClaimWorkItem processes items from claimQueue
func (p *ProvisioningProtectionController) processNextClaimWorkItem(ctx context.Context) bool {
obj, shutdown := p.claimQueue.Get()
if shutdown {
return false
}

err := func(obj interface{}) error {
defer p.claimQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
p.claimQueue.Forget(obj)
return fmt.Errorf("expected string in workqueue but got %#v", obj)
}

if err := p.syncClaimHandler(ctx, key); err != nil {
klog.Warningf("Retrying syncing claim %q after %v failures", key, p.claimQueue.NumRequeues(obj))
p.claimQueue.AddRateLimited(obj)
} else {
p.claimQueue.Forget(obj)
}

return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
}

return true
}

// enqueueClaimUpdate takes a PVC obj and stores it into the claim work queue.
func (p *ProvisioningProtectionController) enqueueClaimUpdate(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(err)
return
}

p.claimQueue.Add(key)
}

// syncClaimHandler gets the claim from informer's cache then calls syncClaim
func (p *ProvisioningProtectionController) syncClaimHandler(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}

claim, err := p.claimLister.PersistentVolumeClaims(namespace).Get(name)
if err != nil {
if apierrs.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("item '%s' in work queue no longer exists", key))
return nil
}

return err
}

return p.syncClaim(ctx, claim)
}

// syncClaim removes finalizers from a PVC, when provision is finished
func (p *ProvisioningProtectionController) syncClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {
if !checkFinalizer(claim, pvcProvisioningFinalizer) || claim.Spec.VolumeName == "" {
return nil
}

// Remove provision finalizer
finalizers := make([]string, 0)
for _, finalizer := range claim.ObjectMeta.Finalizers {
if finalizer != pvcProvisioningFinalizer {
finalizers = append(finalizers, finalizer)
}
}

clone := claim.DeepCopy()
clone.Finalizers = finalizers
if _, err := p.client.CoreV1().PersistentVolumeClaims(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}); err != nil {
return err
}
return nil
}
Loading