diff --git a/.golangci.yml b/.golangci.yml index fcbe83b..2415d9f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,12 +3,11 @@ # options for analysis running run: - go: '1.21' # default concurrency is a available CPU number # concurrency: 4 # timeout for analysis, e.g. 30s, 5m, default is 1m - deadline: 10m + timeout: 10m # exit code when at least one issue was found, default is 1 issues-exit-code: 1 @@ -20,7 +19,7 @@ run: # won't be reported. Default value is empty list, but there is # no need to include all autogenerated files, we confidently recognize # autogenerated files. If it's not please let us know. - skip-files: + exclude-files: - charts/ - docs/ @@ -35,7 +34,13 @@ run: # output configuration options output: # colored-line-number|line-number|json|tab|checkstyle, default is "colored-line-number" - format: line-number + formats: + - format: line-number + path: stdout + print-issued-lines: true + print-linter-name: true + uniq-by-line: true + sort-results: true # all available settings of specific linters linters-settings: @@ -47,9 +52,7 @@ linters-settings: # report about assignment of errors to blank identifier: `num, _ := strconv.Atoi(numStr)`; # default is false: such cases aren't reported by default. check-blank: true - govet: - # report about shadowed variables - check-shadowing: true + govet: {} gofmt: # simplify code: gofmt with `-s` option, true by default simplify: true @@ -168,11 +171,19 @@ linters: - perfsprint - protogetter + # temporarily disabled linters + - copyloopvar + # abandoned linters for which golangci shows the warning that the repo is archived by the owner - golint - interfacer - maligned - scopelint + - varcheck + - structcheck + - deadcode + - ifshort + - perfsprint disable-all: false fast: false diff --git a/cmd/pvecsictl/main.go b/cmd/pvecsictl/main.go index 764bdac..0120dd2 100644 --- a/cmd/pvecsictl/main.go +++ b/cmd/pvecsictl/main.go @@ -85,6 +85,7 @@ func run() int { cmd.AddCommand(buildMigrateCmd()) cmd.AddCommand(buildRenameCmd()) + cmd.AddCommand(buildSwapCmd()) err := cmd.ExecuteContext(ctx) if err != nil { diff --git a/cmd/pvecsictl/rename.go b/cmd/pvecsictl/rename.go index 0aeb7a8..098c655 100644 --- a/cmd/pvecsictl/rename.go +++ b/cmd/pvecsictl/rename.go @@ -57,10 +57,9 @@ func buildRenameCmd() *cobra.Command { func setrenameCmdFlags(cmd *cobra.Command) { flags := cmd.Flags() - flags.StringP("namespace", "n", "", "namespace of the persistentvolumeclaims") + flags.StringP("namespace", "n", "", "namespace of the PersistentVolumeClaims") - flags.BoolP("force", "f", false, "force migration even if the persistentvolumeclaims is in use") - flags.Int("timeout", 120, "task timeout in seconds") + flags.BoolP("force", "f", false, "force migration even if the PersistentVolumeClaims is in use") } // nolint: cyclop, gocyclo @@ -84,25 +83,30 @@ func (c *renameCmd) runRename(cmd *cobra.Command, args []string) error { cordonedNodes := []string{} - if len(pods) > 0 { - if force { - logger.Infof("persistentvolumeclaims is using by pods: %s on node %s, trying to force migration\n", strings.Join(pods, ","), vmName) - - var csiNodes []string + defer func() { + if len(cordonedNodes) > 0 { + logger.Infof("uncordoning nodes: %s", strings.Join(cordonedNodes, ",")) - csiNodes, err = tools.CSINodes(ctx, c.kclient, srcPV.Spec.CSI.Driver) - if err != nil { - return err + if err = tools.UncondonNodes(ctx, c.kclient, cordonedNodes); err != nil { + logger.Errorf("failed to uncordon nodes: %v", err) } + } + }() - cordonedNodes = append(cordonedNodes, csiNodes...) + if len(pods) > 0 { + if force { + if srcPV.Spec.CSI == nil { + return fmt.Errorf("only CSI PersistentVolumes can be swapped in force mode") + } - logger.Infof("cordoning nodes: %s", strings.Join(cordonedNodes, ",")) + logger.Infof("persistentvolumeclaims is using by pods: %s on node %s, trying to force migration\n", strings.Join(pods, ","), vmName) - if _, err = tools.CondonNodes(ctx, c.kclient, cordonedNodes); err != nil { + cordonedNodes, err = cordoneNodeWithPVs(ctx, c.kclient, srcPV) + if err != nil { return fmt.Errorf("failed to cordon nodes: %v", err) } + logger.Infof("cordoned nodes: %s", strings.Join(cordonedNodes, ",")) logger.Infof("terminated pods: %s", strings.Join(pods, ",")) for _, pod := range pods { @@ -134,15 +138,9 @@ func (c *renameCmd) runRename(cmd *cobra.Command, args []string) error { err = renamePVC(ctx, c.kclient, c.namespace, srcPVC, srcPV, args[1]) if err != nil { - return fmt.Errorf("failed to rename persistentvolumeclaims: %v", err) - } - - if force { - logger.Infof("uncordoning nodes: %s", strings.Join(cordonedNodes, ",")) + cordonedNodes = []string{} - if err = tools.UncondonNodes(ctx, c.kclient, cordonedNodes); err != nil { - return fmt.Errorf("failed to uncordon nodes: %v", err) - } + return fmt.Errorf("failed to rename persistentvolumeclaims: %v", err) } logger.Infof("persistentvolumeclaims %s has been renamed", args[0]) diff --git a/cmd/pvecsictl/swap.go b/cmd/pvecsictl/swap.go new file mode 100644 index 0000000..8e9a07d --- /dev/null +++ b/cmd/pvecsictl/swap.go @@ -0,0 +1,218 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "strings" + "time" + + cobra "github.com/spf13/cobra" + + tools "github.com/sergelogvinov/proxmox-csi-plugin/pkg/tools" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientkubernetes "k8s.io/client-go/kubernetes" +) + +type swapCmd struct { + kclient *clientkubernetes.Clientset + namespace string +} + +func buildSwapCmd() *cobra.Command { + c := &swapCmd{} + + cmd := cobra.Command{ + Use: "swap pvc-a pvc-b", + Aliases: []string{"sw"}, + Short: "Swap PersistentVolumes between two PersistentVolumeClaims", + Args: cobra.ExactArgs(2), + PreRunE: c.swapValidate, + RunE: c.runSwap, + SilenceUsage: true, + SilenceErrors: true, + } + + setSwapCmdFlags(&cmd) + + return &cmd +} + +func setSwapCmdFlags(cmd *cobra.Command) { + flags := cmd.Flags() + + flags.StringP("namespace", "n", "", "namespace of the PersistentVolumeClaims") + + flags.BoolP("force", "f", false, "force migration even if the PersistentVolumeClaims is in use") +} + +// nolint: cyclop, gocyclo +func (c *swapCmd) runSwap(cmd *cobra.Command, args []string) error { + flags := cmd.Flags() + force, _ := flags.GetBool("force") //nolint: errcheck + + var err error + + ctx := context.Background() + + srcPVC, srcPV, err := tools.PVCResources(ctx, c.kclient, c.namespace, args[0]) + if err != nil { + return fmt.Errorf("failed to get resources: %v", err) + } + + srcPods, srcVMName, err := tools.PVCPodUsage(ctx, c.kclient, c.namespace, args[0]) + if err != nil { + return fmt.Errorf("failed to find pods using pvc: %v", err) + } + + dstPVC, dstPV, err := tools.PVCResources(ctx, c.kclient, c.namespace, args[1]) + if err != nil { + return fmt.Errorf("failed to get resources: %v", err) + } + + dstPods, dstVMName, err := tools.PVCPodUsage(ctx, c.kclient, c.namespace, args[1]) + if err != nil { + return fmt.Errorf("failed to find pods using pvc: %v", err) + } + + cordonedNodes := []string{} + + defer func() { + if len(cordonedNodes) > 0 { + logger.Infof("uncordoning nodes: %s", strings.Join(cordonedNodes, ",")) + + if err = tools.UncondonNodes(ctx, c.kclient, cordonedNodes); err != nil { + logger.Errorf("failed to uncordon nodes: %v", err) + } + } + }() + + if len(srcPods) > 0 || len(dstPods) > 0 { + if force { + var csiNodes []string + + if srcPV.Spec.CSI == nil || dstPV.Spec.CSI == nil { + return fmt.Errorf("only CSI PersistentVolumes can be swapped in force mode") + } + + if len(srcPods) > 0 { + logger.Infof("persistentvolumeclaims is using by pods: %s on node %s, trying to force swap\n", strings.Join(srcPods, ","), srcVMName) + + csiNodes, err = cordoneNodeWithPVs(ctx, c.kclient, srcPV) + if err != nil { + return fmt.Errorf("failed to cordon nodes: %v", err) + } + + cordonedNodes = append(cordonedNodes, csiNodes...) + } + + if len(dstPods) > 0 { + logger.Infof("persistentvolumeclaims is using by pods: %s on node %s, trying to force swap\n", strings.Join(dstPods, ","), dstVMName) + + csiNodes, err = cordoneNodeWithPVs(ctx, c.kclient, dstPV) + if err != nil { + return fmt.Errorf("failed to cordon nodes: %v", err) + } + + cordonedNodes = append(cordonedNodes, csiNodes...) + } + + logger.Infof("cordoned nodes: %s", strings.Join(cordonedNodes, ",")) + + pods := srcPods + pods = append(pods, dstPods...) + + logger.Infof("terminated pods: %s", strings.Join(pods, ",")) + + for _, pod := range pods { + if err = c.kclient.CoreV1().Pods(c.namespace).Delete(ctx, pod, metav1.DeleteOptions{}); err != nil { + return fmt.Errorf("failed to delete pod: %v", err) + } + } + + waitPods := func(pod string) error { + for { + p, _, e := tools.PVCPodUsage(ctx, c.kclient, c.namespace, pod) + if e != nil { + return fmt.Errorf("failed to find pods using pvc: %v", e) + } + + if len(p) == 0 { + break + } + + logger.Infof("waiting pods: %s", strings.Join(p, " ")) + + time.Sleep(2 * time.Second) + } + + return nil + } + + if err = waitPods(args[0]); err != nil { + return err + } + + if err = waitPods(args[1]); err != nil { + return err + } + + time.Sleep(5 * time.Second) + } else { + if len(srcPods) > 0 { + return fmt.Errorf("persistentvolumeclaims is using by pods: %s on the node %s, cannot swap pvc", strings.Join(srcPods, ","), srcVMName) + } + + if len(dstPods) > 0 { + return fmt.Errorf("persistentvolumeclaims is using by pods: %s on the node %s, cannot swap pvc", strings.Join(dstPods, ","), dstVMName) + } + } + } + + err = swapPVC(ctx, c.kclient, c.namespace, srcPVC, srcPV, dstPVC, dstPV) + if err != nil { + cordonedNodes = []string{} + + return fmt.Errorf("failed to swap persistentvolumeclaims: %v", err) + } + + logger.Infof("persistentvolumeclaims %s,%s has been swapped", args[0], args[1]) + + return nil +} + +func (c *swapCmd) swapValidate(cmd *cobra.Command, _ []string) error { + flags := cmd.Flags() + + namespace, _ := flags.GetString("namespace") //nolint: errcheck + + kclientConfig, namespace, err := tools.BuildConfig(kubeconfig, namespace) + if err != nil { + return fmt.Errorf("failed to create kubernetes config: %v", err) + } + + c.kclient, err = clientkubernetes.NewForConfig(kclientConfig) + if err != nil { + return fmt.Errorf("failed to create kubernetes client: %v", err) + } + + c.namespace = namespace + + return nil +} diff --git a/cmd/pvecsictl/utils.go b/cmd/pvecsictl/utils.go index f921ce9..e8abc45 100644 --- a/cmd/pvecsictl/utils.go +++ b/cmd/pvecsictl/utils.go @@ -30,6 +30,28 @@ import ( clientkubernetes "k8s.io/client-go/kubernetes" ) +func cordoneNodeWithPVs( + ctx context.Context, + kclient *clientkubernetes.Clientset, + pv *corev1.PersistentVolume, +) ([]string, error) { + var ( + err error + csiNodes []string + ) + + csiNodes, err = tools.CSINodes(ctx, kclient, pv.Spec.CSI.Driver) + if err != nil { + return nil, err + } + + if _, err = tools.CondonNodes(ctx, kclient, csiNodes); err != nil { + return nil, err + } + + return csiNodes, nil +} + func replacePVTopology( ctx context.Context, clientset *clientkubernetes.Clientset, @@ -125,26 +147,96 @@ func renamePVC( if pv.Spec.PersistentVolumeReclaimPolicy == corev1.PersistentVolumeReclaimDelete { if _, err := clientset.CoreV1().PersistentVolumes().Patch(ctx, pvc.Spec.VolumeName, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { - return fmt.Errorf("failed to patch PersistentVolumes: %v", err) + return fmt.Errorf("failed to patch PersistentVolume: %v", err) } } policy := metav1.DeletePropagationForeground if err := clientset.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, pvc.Name, metav1.DeleteOptions{PropagationPolicy: &policy}); err != nil { - return fmt.Errorf("failed to delete PersistentVolumeClaims: %v", err) + return fmt.Errorf("failed to delete PersistentVolumeClaim: %v", err) } patch = []byte(`{"spec":{"claimRef":null}}`) if _, err := clientset.CoreV1().PersistentVolumes().Patch(ctx, pvc.Spec.VolumeName, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { - return fmt.Errorf("failed to patch PersistentVolumes: %v", err) + return fmt.Errorf("failed to patch PersistentVolume: %v", err) } - if _, err := clientset.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, newPVC, metav1.CreateOptions{}); err != nil { - if _, err := clientset.CoreV1().PersistentVolumeClaims(namespace).Update(ctx, newPVC, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("failed to create/update PersistentVolumeClaims: %v", err) + if _, err := tools.PVCCreateOrUpdate(ctx, clientset, newPVC); err != nil { + return fmt.Errorf("failed to create/update PersistentVolumeClaim %s: %v", newPVC.Name, err) + } + + return nil +} + +func swapPVC( + ctx context.Context, + clientset *clientkubernetes.Clientset, + namespace string, + srcPVC *corev1.PersistentVolumeClaim, + srcPV *corev1.PersistentVolume, + dstPVC *corev1.PersistentVolumeClaim, + dstPV *corev1.PersistentVolume, +) error { + newSrcPVC := srcPVC.DeepCopy() + newSrcPVC.ObjectMeta.Name = dstPVC.ObjectMeta.Name + newSrcPVC.ObjectMeta.UID = "" + newSrcPVC.ObjectMeta.ResourceVersion = "" + newSrcPVC.Status = corev1.PersistentVolumeClaimStatus{} + newSrcPVC.Spec.Resources.Requests = corev1.ResourceList{ + corev1.ResourceStorage: srcPVC.Status.Capacity[corev1.ResourceStorage], + } + + newDstPVC := dstPVC.DeepCopy() + newDstPVC.ObjectMeta.Name = srcPVC.ObjectMeta.Name + newDstPVC.ObjectMeta.UID = "" + newDstPVC.ObjectMeta.ResourceVersion = "" + newDstPVC.Status = corev1.PersistentVolumeClaimStatus{} + newDstPVC.Spec.Resources.Requests = corev1.ResourceList{ + corev1.ResourceStorage: dstPVC.Status.Capacity[corev1.ResourceStorage], + } + + patch := []byte(`{"spec":{"persistentVolumeReclaimPolicy":"` + corev1.PersistentVolumeReclaimRetain + `"}}`) + + if srcPV.Spec.PersistentVolumeReclaimPolicy == corev1.PersistentVolumeReclaimDelete { + if _, err := clientset.CoreV1().PersistentVolumes().Patch(ctx, srcPVC.Spec.VolumeName, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { + return fmt.Errorf("failed to patch PersistentVolume: %v", err) + } + } + + if dstPV.Spec.PersistentVolumeReclaimPolicy == corev1.PersistentVolumeReclaimDelete { + if _, err := clientset.CoreV1().PersistentVolumes().Patch(ctx, dstPVC.Spec.VolumeName, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { + return fmt.Errorf("failed to patch PersistentVolume: %v", err) } } + policy := metav1.DeletePropagationForeground + + if err := clientset.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, srcPVC.Name, metav1.DeleteOptions{PropagationPolicy: &policy}); err != nil { + return fmt.Errorf("failed to delete PersistentVolumeClaim: %v", err) + } + + if err := clientset.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, dstPVC.Name, metav1.DeleteOptions{PropagationPolicy: &policy}); err != nil { + return fmt.Errorf("failed to delete PersistentVolumeClaim: %v", err) + } + + patch = []byte(`{"spec":{"claimRef":null}}`) + + if _, err := clientset.CoreV1().PersistentVolumes().Patch(ctx, srcPVC.Spec.VolumeName, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { + return fmt.Errorf("failed to patch PersistentVolume: %v", err) + } + + if _, err := clientset.CoreV1().PersistentVolumes().Patch(ctx, dstPVC.Spec.VolumeName, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { + return fmt.Errorf("failed to patch PersistentVolume: %v", err) + } + + if _, err := tools.PVCCreateOrUpdate(ctx, clientset, newSrcPVC); err != nil { + return fmt.Errorf("failed to create/update PersistentVolumeClaim %s: %v", newSrcPVC.Name, err) + } + + if _, err := tools.PVCCreateOrUpdate(ctx, clientset, newDstPVC); err != nil { + return fmt.Errorf("failed to create/update PersistentVolumeClaim %s: %v", newDstPVC.Name, err) + } + return nil } diff --git a/docs/pvecsictl.md b/docs/pvecsictl.md index 7421682..fc25f6a 100644 --- a/docs/pvecsictl.md +++ b/docs/pvecsictl.md @@ -169,6 +169,53 @@ test-0 0/1 ContainerCreating 0 13s kube-store test-0 1/1 Running 0 24s 10.32.19.17 kube-store-11 ``` +### Swap + +Swap PersistentVolumeClaim between two PVCs. + +Check the current PVC: + +```shell +# kubectl get pods,pvc +NAME READY STATUS RESTARTS AGE +pod/test-0 1/1 Running 0 2m58s +pod/test-1 1/1 Running 0 2m58s + +NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS VOLUMEATTRIBUTESCLASS AGE +persistentvolumeclaim/storage-test-0 Bound pvc-e248bc56-dcf4-4145-93b9-a374a7c3b900 10Gi RWO proxmox-lvm 2m51s +persistentvolumeclaim/storage-test-1 Bound pvc-41b7078d-aa9f-4757-9056-8bd1e8e0697f 15Gi RWO proxmox-lvm 2m52s +``` + +Swap PVCs: + +```shell +pvecsictl swap -n default storage-test-0 storage-test-1 -f + +INFO persistentvolumeclaims is using by pods: test-0 on node builder-03a, trying to force swap +INFO persistentvolumeclaims is using by pods: test-1 on node builder-04b, trying to force swap +INFO cordoned nodes: builder-03a,builder-03b,builder-04a,builder-04b +INFO terminated pods: test-0,test-1 +INFO waiting pods: test-0 +INFO waiting pods: test-0 +INFO persistentvolumeclaims storage-test-0,storage-test-1 has been swapped +INFO uncordoning nodes: builder-03a,builder-03b,builder-04a,builder-04b +``` + +Check the result: + +* storage-test-0 <-> storage-test-1 + +```shell +# kubectl get pods,pvc +NAME READY STATUS RESTARTS AGE +pod/test-0 1/1 Running 0 19s +pod/test-1 1/1 Running 0 19s + +NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS VOLUMEATTRIBUTESCLASS AGE +persistentvolumeclaim/storage-test-0 Bound pvc-41b7078d-aa9f-4757-9056-8bd1e8e0697f 15Gi RWO proxmox-lvm 13s +persistentvolumeclaim/storage-test-1 Bound pvc-e248bc56-dcf4-4145-93b9-a374a7c3b900 10Gi RWO proxmox-lvm 13s +``` + # Feedback Use the [github discussions](https://github.com/sergelogvinov/proxmox-csi-plugin/discussions) for feedback and questions. diff --git a/pkg/proxmox/volume.go b/pkg/proxmox/volume.go index e07e53e..a4b95cb 100644 --- a/pkg/proxmox/volume.go +++ b/pkg/proxmox/volume.go @@ -90,7 +90,7 @@ func MoveQemuDisk(cluster *pxapi.Client, vol *volume.Volume, node string, taskTi return fmt.Errorf("failed to parse response: %v", err) } - for i := 0; i < 3; i++ { + for range 3 { if _, err = cluster.WaitForCompletion(taskResponse); err != nil { time.Sleep(2 * time.Second) diff --git a/pkg/tools/pv.go b/pkg/tools/pv.go index 0828972..62091fa 100644 --- a/pkg/tools/pv.go +++ b/pkg/tools/pv.go @@ -18,11 +18,13 @@ package tools import ( "context" + "encoding/json" "fmt" "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" clientkubernetes "k8s.io/client-go/kubernetes" ) @@ -65,6 +67,31 @@ func PVCPodUsage(ctx context.Context, clientset *clientkubernetes.Clientset, nam return pods, node, nil } +// PVCCreateOrUpdate creates or updates the specified PersistentVolumeClaim resource. +func PVCCreateOrUpdate( + ctx context.Context, + clientset *clientkubernetes.Clientset, + pvc *corev1.PersistentVolumeClaim, +) (*corev1.PersistentVolumeClaim, error) { + res, err := clientset.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{}) + if err != nil { + patch := corev1.PersistentVolumeClaim{ + Spec: corev1.PersistentVolumeClaimSpec{ + VolumeName: pvc.Spec.VolumeName, + }, + } + + patchBytes, err := json.Marshal(&patch) + if err != nil { + return nil, fmt.Errorf("failed to json.Marshal PVC: %w", err) + } + + return clientset.CoreV1().PersistentVolumeClaims(pvc.Namespace).Patch(ctx, pvc.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) + } + + return res, err +} + // PVWaitDelete waits for the specified PersistentVolume to be deleted. func PVWaitDelete(ctx context.Context, clientset *clientkubernetes.Clientset, pvName string) error { _, err := clientset.CoreV1().PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{})