Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disk: re-implement StagingTargetPath cleanup logic #1248

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 60 additions & 58 deletions pkg/disk/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ const (
RundSocketDir = "/host/etc/kubernetes/volumes/rund/"
// VolumeDirRemove volume dir remove
VolumeDirRemove = "/host/etc/kubernetes/volumes/disk/remove"
// InputOutputErr tag
InputOutputErr = "input/output error"
// DiskMultiTenantEnable Enable disk multi-tenant mode
DiskMultiTenantEnable = "DISK_MULTI_TENANT_ENABLE"
// TenantUserUID tag
Expand Down Expand Up @@ -688,9 +686,44 @@ func addDiskXattr(diskID string) (err error) {
return unix.Setxattr(device, DiskXattrName, []byte(diskID), 0)
}

// target format: /var/lib/kubelet/plugins/kubernetes.io/csi/pv/pv-disk-1e7001e0-c54a-11e9-8f89-00163e0e78a0/globalmount
func ensureUnmounted(mounter k8smount.Interface, target string) error {
notmounted, err := mounter.IsLikelyNotMountPoint(target)
if err != nil {
return fmt.Errorf("failed to check if %s is not a mount point after unmount: %w", target, err)
}
if !notmounted {
return fmt.Errorf("path %s is still mounted after unmount", target)
}
return nil
}

func cleanupVolumeDeviceMount(path string) error {
err := unix.Unmount(path, 0)
if err != nil {
switch {
case errors.Is(err, unix.ENOENT):
return nil
case errors.Is(err, unix.EINVAL):
// Maybe not mounted, proceed to remove it. If not, unlink will report error.
default:
return err
}
}

errUnlink := unix.Unlink(path)
if errUnlink == nil {
return nil
}
if err != nil {
return fmt.Errorf("failed to unlink %s: %w", path, errUnlink)
} else {
return fmt.Errorf("failed to unmount %s: %w; then failed to unlink: %w", path, err, errUnlink)
}
}

func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
klog.Infof("NodeUnstageVolume:: Starting to Unmount volume, volumeId: %s, target: %v", req.VolumeId, req.StagingTargetPath)
logger := klog.FromContext(ctx)
logger.Info("Starting to Unmount volume", "target", req.StagingTargetPath)

if !ns.locks.TryAcquire(req.VolumeId) {
return nil, status.Errorf(codes.Aborted, "There is already an operation for %s", req.VolumeId)
Expand All @@ -699,66 +732,35 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag

// check block device mountpoint
targetPath := req.GetStagingTargetPath()
tmpPath := filepath.Join(req.GetStagingTargetPath(), req.VolumeId)
if IsFileExisting(tmpPath) {
fileInfo, err := os.Lstat(tmpPath)
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), InputOutputErr) {
if err = isPathAvailiable(targetPath); err != nil {
if err = ns.k8smounter.Unmount(targetPath); err != nil {
klog.Errorf("NodeUnstageVolume: umount target %s(input/output error) with error: %v", targetPath, err)
return nil, status.Errorf(codes.InvalidArgument, "NodeUnstageVolume umount target %s with errror: %v", targetPath, err)
}
klog.Warningf("NodeUnstageVolume: target path %s show input/output error: %v, umount it.", targetPath, err)
}
} else {
klog.Errorf("NodeUnstageVolume: lstat mountpoint: %s with error: %s", tmpPath, err.Error())
return nil, status.Error(codes.InvalidArgument, "NodeUnstageVolume: stat mountpoint error: "+err.Error())
}
} else {
if (fileInfo.Mode() & os.ModeDevice) != 0 {
klog.Infof("NodeUnstageVolume: mountpoint %s, is block device", tmpPath)
err := unix.Unmount(targetPath, 0)
if err != nil {
switch {
case errors.Is(err, unix.ENOENT):
logger.Info("targetPath not exist, continue to detach")
case errors.Is(err, unix.EINVAL):
// Maybe unmounted, lets check
if errCheck := ensureUnmounted(ns.k8smounter, targetPath); errCheck != nil {
return nil, status.Errorf(codes.Internal, "failed to unmount %s: %v. %v", targetPath, err, errCheck)
}
// if mountpoint not a block device, maybe something wrong happened in VolumeStageVolume.
// when pod deleted, the volume should be detached
targetPath = tmpPath
}
}

// Step 1: check folder exists and umount
msgLog := ""
if IsFileExisting(targetPath) {
notmounted, err := ns.k8smounter.IsLikelyNotMountPoint(targetPath)
if err != nil {
klog.Errorf("NodeUnstageVolume: VolumeId: %s, check mountPoint: %s error: %v", req.VolumeId, targetPath, err)
return nil, status.Error(codes.Internal, err.Error())
}
if !notmounted {
err = ns.k8smounter.Unmount(targetPath)
// really umounted, check volumeDevice
// Note: we remove the blockPath, but not targetPath, because the former is created by us, while the latter is created by CO.
blockPath := filepath.Join(targetPath, req.VolumeId)
logger.Info("targetPath may not be a mountpoint, checking volumeDevice")
err := cleanupVolumeDeviceMount(blockPath)
if err != nil {
klog.Errorf("NodeUnstageVolume: VolumeId: %s, umount path: %s failed with: %v", req.VolumeId, targetPath, err)
return nil, status.Error(codes.Internal, err.Error())
}
notmounted, err = ns.k8smounter.IsLikelyNotMountPoint(targetPath)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to check if %s is not a mount point after umount: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "failed to cleanup volumeDevice path %s: %v", blockPath, err)
}
if !notmounted {
klog.Errorf("NodeUnstageVolume: TargetPath mounted yet, volumeId: %s, Target: %s", req.VolumeId, targetPath)
return nil, status.Error(codes.Internal, "NodeUnstageVolume: TargetPath mounted yet with target"+targetPath)
}
} else {
msgLog = fmt.Sprintf("NodeUnstageVolume: VolumeId: %s, mountpoint: %s not mounted, skipping and continue to detach", req.VolumeId, targetPath)
default:
return nil, status.Errorf(codes.Internal, "failed to unmount %s: %v", targetPath, err)
}
} else {
msgLog = fmt.Sprintf("NodeUnstageVolume: VolumeId: %s, Path %s doesn't exist, continue to detach", req.VolumeId, targetPath)
}

if msgLog == "" {
klog.Infof("NodeUnstageVolume: Unmount TargetPath successful, target %v, volumeId: %s", targetPath, req.VolumeId)
} else {
klog.Infof(msgLog)
err := ensureUnmounted(ns.k8smounter, targetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
logger.V(2).Info("targetPath cleaned up")

if IsVFNode() {
if err := unbindBdfDisk(req.VolumeId); err != nil {
Expand All @@ -777,7 +779,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
}
}

err := addDiskXattr(req.VolumeId)
err = addDiskXattr(req.VolumeId)
if err != nil {
klog.Errorf("NodeUnstageVolume: addDiskXattr %s failed: %v", req.VolumeId, err)
}
Expand Down
14 changes: 0 additions & 14 deletions pkg/disk/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,20 +872,6 @@ func GetVolumeDeviceName(diskID string) (string, error) {
return device, err
}

// isPathAvailiable
func isPathAvailiable(path string) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("Open Path (%s) with error: %v ", path, err)
}
defer f.Close()
_, err = f.Readdirnames(1)
if err != nil && err != io.EOF {
return fmt.Errorf("Read Path (%s) with error: %v ", path, err)
}
return nil
}

func getBlockDeviceCapacity(devicePath string) int64 {

file, err := os.Open(devicePath)
Expand Down