diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 8cb6631176..ad013daa6d 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -37,16 +37,18 @@ var ( ) type controllerService struct { - juicefs juicefs.Interface - vols map[string]int64 + juicefs juicefs.Interface + vols map[string]int64 + volLocks *util.VolumeLocks } func newControllerService(k8sClient *k8sclient.K8sClient) (controllerService, error) { jfs := juicefs.NewJfsProvider(nil, k8sClient) return controllerService{ - juicefs: jfs, - vols: make(map[string]int64), + juicefs: jfs, + vols: make(map[string]int64), + volLocks: util.NewVolumeLocks(), }, nil } @@ -142,6 +144,12 @@ func (d *controllerService) DeleteVolume(ctx context.Context, req *csi.DeleteVol return &csi.DeleteVolumeResponse{}, nil } + if acquired := d.volLocks.TryAcquire(volumeID); !acquired { + klog.Errorf("DeleteVolume: Volume %q is being used by another operation", volumeID) + return nil, status.Errorf(codes.Aborted, "DeleteVolume: Volume %q is being used by another operation", volumeID) + } + defer d.volLocks.Release(volumeID) + klog.V(5).Infof("DeleteVolume: Deleting volume %q", volumeID) err = d.juicefs.JfsDeleteVol(ctx, volumeID, volumeID, secrets, nil, nil) if err != nil { diff --git a/pkg/driver/controller_test.go b/pkg/driver/controller_test.go index c999fa46d3..d9b23d2dcf 100644 --- a/pkg/driver/controller_test.go +++ b/pkg/driver/controller_test.go @@ -34,6 +34,7 @@ import ( "github.com/juicedata/juicefs-csi-driver/pkg/juicefs" "github.com/juicedata/juicefs-csi-driver/pkg/juicefs/mocks" k8s "github.com/juicedata/juicefs-csi-driver/pkg/k8sclient" + "github.com/juicedata/juicefs-csi-driver/pkg/util" ) func TestNewControllerService(t *testing.T) { @@ -287,6 +288,7 @@ func TestDeleteVolume(t *testing.T) { vols: map[string]int64{ volumeId: int64(1), }, + volLocks: util.NewVolumeLocks(), } _, err := juicefsDriver.DeleteVolume(ctx, req) @@ -314,8 +316,9 @@ func TestDeleteVolume(t *testing.T) { ctx := context.Background() juicefsDriver := controllerService{ - juicefs: nil, - vols: make(map[string]int64), + juicefs: nil, + vols: make(map[string]int64), + volLocks: util.NewVolumeLocks(), } _, err := juicefsDriver.DeleteVolume(ctx, req) @@ -349,8 +352,9 @@ func TestDeleteVolume(t *testing.T) { mockJuicefs.EXPECT().JfsDeleteVol(context.TODO(), volumeId, volumeId, secret, nil, nil).Return(errors.New("test")) juicefsDriver := controllerService{ - juicefs: mockJuicefs, - vols: map[string]int64{volumeId: int64(1)}, + juicefs: mockJuicefs, + vols: map[string]int64{volumeId: int64(1)}, + volLocks: util.NewVolumeLocks(), } _, err := juicefsDriver.DeleteVolume(ctx, req) @@ -378,8 +382,9 @@ func TestDeleteVolume(t *testing.T) { ctx := context.Background() juicefsDriver := controllerService{ - juicefs: nil, - vols: make(map[string]int64), + juicefs: nil, + vols: make(map[string]int64), + volLocks: util.NewVolumeLocks(), } _, err := juicefsDriver.DeleteVolume(ctx, req) diff --git a/pkg/juicefs/mount/builder/job.go b/pkg/juicefs/mount/builder/job.go index b400567c06..7d68fd6920 100644 --- a/pkg/juicefs/mount/builder/job.go +++ b/pkg/juicefs/mount/builder/job.go @@ -91,6 +91,10 @@ func (r *JobBuilder) newJob(jobName string) *batchv1.Job { Exec: &corev1.ExecAction{Command: []string{"sh", "-c", "umount /mnt/jfs -l && rmdir /mnt/jfs"}}, }, } + // set node name to empty to let k8s scheduler to choose a node + podTemplate.Spec.NodeName = "" + // set priority class name to empty to make job use default priority class + podTemplate.Spec.PriorityClassName = "" podTemplate.Spec.RestartPolicy = corev1.RestartPolicyOnFailure job := batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/util/volume.go b/pkg/util/volume.go index 740ba8962a..0760d44483 100644 --- a/pkg/util/volume.go +++ b/pkg/util/volume.go @@ -19,6 +19,7 @@ package util import ( "context" "fmt" + "sync" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" @@ -102,3 +103,29 @@ func getVol(ctx context.Context, client *k8sclient.K8sClient, pod *corev1.Pod, n } return } + +type VolumeLocks struct { + locks sync.Map + mux sync.Mutex +} + +func NewVolumeLocks() *VolumeLocks { + return &VolumeLocks{} +} + +func (vl *VolumeLocks) TryAcquire(volumeID string) bool { + vl.mux.Lock() + defer vl.mux.Unlock() + if _, ok := vl.locks.Load(volumeID); ok { + return false + } + vl.locks.Store(volumeID, nil) + return true +} + +// Release deletes the lock on volumeID. +func (vl *VolumeLocks) Release(volumeID string) { + vl.mux.Lock() + defer vl.mux.Unlock() + vl.locks.Delete(volumeID) +}