From 957d5095c032068a1e85e9113d5d863ca749318a Mon Sep 17 00:00:00 2001 From: Rafael Mendez Date: Thu, 13 Jun 2024 20:29:05 +0000 Subject: [PATCH] Adding Feature To Modify Tags Of Existing Volumes Through VolumeAttributesClass --- .../templates/controller.yaml | 8 +- cmd/main.go | 4 +- docs/example-iam-policy.json | 10 +- docs/tagging.md | 31 +++++ examples/kubernetes/modify-volume/README.md | 10 +- .../manifests/volumeattributesclass.yaml | 2 + go.mod | 2 +- go.sum | 4 +- hack/e2e/kops/patch-cluster.yaml | 10 +- pkg/cloud/cloud.go | 42 +++++++ pkg/cloud/cloud_test.go | 115 ++++++++++++++++++ pkg/cloud/ec2_interface.go | 1 + pkg/cloud/interface.go | 1 + pkg/cloud/metadata/k8s.go | 90 ++++++++------ pkg/cloud/mock_cloud.go | 14 +++ pkg/cloud/mock_ec2.go | 20 +++ pkg/coalescer/coalescer.go | 10 +- pkg/driver/controller.go | 15 +-- pkg/driver/controller_modify_volume.go | 87 ++++++++++--- pkg/driver/controller_modify_volume_test.go | 76 +++++++++--- pkg/driver/options.go | 6 + pkg/driver/request_coalescing_test.go | 2 - tests/e2e/modify_volume.go | 25 +++- tests/e2e/testsuites/e2e_utils.go | 2 + tests/e2e/testsuites/modify_volume_tester.go | 1 + 25 files changed, 469 insertions(+), 119 deletions(-) diff --git a/charts/aws-ebs-csi-driver/templates/controller.yaml b/charts/aws-ebs-csi-driver/templates/controller.yaml index 6b393f401a..53c242e603 100644 --- a/charts/aws-ebs-csi-driver/templates/controller.yaml +++ b/charts/aws-ebs-csi-driver/templates/controller.yaml @@ -513,7 +513,7 @@ spec: {{- with .Values.controller.volumes }} {{- toYaml . | nindent 8 }} {{- end }} - {{- if .Values.controller.dnsConfig }} - dnsConfig: - {{- toYaml .Values.controller.dnsConfig | nindent 4 }} - {{- end }} + {{- if .Values.controller.dnsConfig }} + dnsConfig: + {{- toYaml .Values.controller.dnsConfig | nindent 8 }} + {{- end }} diff --git a/cmd/main.go b/cmd/main.go index 496334d1f4..1428a69dc1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -68,7 +68,7 @@ func main() { switch cmd { case "pre-stop-hook": - clientset, clientErr := metadata.DefaultKubernetesAPIClient() + clientset, clientErr := metadata.DefaultKubernetesAPIClient(options.Kubeconfig)() if clientErr != nil { klog.ErrorS(err, "unable to communicate with k8s API") } else { @@ -140,7 +140,7 @@ func main() { cfg := metadata.MetadataServiceConfig{ EC2MetadataClient: metadata.DefaultEC2MetadataClient, - K8sAPIClient: metadata.DefaultKubernetesAPIClient, + K8sAPIClient: metadata.DefaultKubernetesAPIClient(options.Kubeconfig), } region := os.Getenv("AWS_REGION") diff --git a/docs/example-iam-policy.json b/docs/example-iam-policy.json index 57ae140b47..a85b7b2915 100644 --- a/docs/example-iam-policy.json +++ b/docs/example-iam-policy.json @@ -25,15 +25,7 @@ "Resource": [ "arn:aws:ec2:*:*:volume/*", "arn:aws:ec2:*:*:snapshot/*" - ], - "Condition": { - "StringEquals": { - "ec2:CreateAction": [ - "CreateVolume", - "CreateSnapshot" - ] - } - } + ] }, { "Effect": "Allow", diff --git a/docs/tagging.md b/docs/tagging.md index 5b22a78e42..31085df173 100644 --- a/docs/tagging.md +++ b/docs/tagging.md @@ -96,6 +96,37 @@ backup=true billingID=ABCDEF ``` +# Adding, Modifying, and Deleting Tags Of Existing Volumes +The AWS EBS CSI Driver supports the modifying of tags of existing volumes through `VolumeAttributesClass.parameters` the examples below show the syntax for addition, modification, and deletion of tags within the `VolumeAttributesClass.parameters`. For a walkthrough on how to apply these modifications to a volume follow the [walkthrough for Volume Modification via VolumeAttributeClass](../examples/kubernetes/modify-volume) + +**Syntax for Adding or Modifying a Tag** + +If a key has the prefix `tagSpecification`, the CSI driver will treat the value as a key-value pair to be added to the existing volume. If there is already an existing tag with the specified key, the CSI driver will overwrite the value of that tag with the new value specified. +``` +apiVersion: storage.k8s.io/v1alpha1 +kind: VolumeAttributesClass +metadata: + name: io2-class +driverName: ebs.csi.aws.com +parameters: + tagSpecification_1: "location=Seattle" + tagSpecification_2: "cost-center=" // If the value is left blank, tag is created with an empty value +``` +**Syntax for Deleting a Tag** + +If a key has the prefix `tagDeletion`, the CSI driver will treat the value as a tag key, and the existing tag with that key will be removed from the volume. +``` +apiVersion: storage.k8s.io/v1alpha1 +kind: VolumeAttributesClass +metadata: + name: io2-class +driverName: ebs.csi.aws.com +parameters: + tagDeletion_1: "location" // Deletes tag with key "location" + tagDeletion_2: "cost-center" +``` + + # Snapshot Tagging The AWS EBS CSI Driver supports tagging snapshots through `VolumeSnapshotClass.parameters`, similarly to StorageClass tagging. diff --git a/examples/kubernetes/modify-volume/README.md b/examples/kubernetes/modify-volume/README.md index e492a9ea03..43252ec576 100644 --- a/examples/kubernetes/modify-volume/README.md +++ b/examples/kubernetes/modify-volume/README.md @@ -35,21 +35,25 @@ This example will only work on a cluster with the `VolumeAttributesClass` featur Mon Feb 26 22:28:39 UTC 2024 ... ``` +4. Deploy the `VolumeAttributesClass` + ```sh + $ kubectl apply -f manifests/volumeattributesclass.yaml + ``` -4. Simultaneously, deploy the `VolumeAttributesClass` and edit the `PersistentVolumeClaim` to point to this class +5. Edit the `PersistentVolumeClaim` to point to this class ```sh $ kubectl patch pvc ebs-claim --patch '{"spec": {"volumeAttributesClassName": "io2-class"}}' persistentvolumeclaim/ebs-claim patched ``` -5. Wait for the `VolumeAttributesClass` to apply to the volume +6. Wait for the `VolumeAttributesClass` to apply to the volume ```sh $ kubectl get pvc ebs-claim NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS VOLUMEATTRIBUTESCLASS AGE ebs-claim Bound pvc-076b2d14-b643-47d4-a2ce-fbf9cd36572b 100Gi RWO ebs-sc io2-class 5m54s ``` -6. (Optional) Delete example resources +7. (Optional) Delete example resources ```sh $ kubectl delete -f manifests storageclass.storage.k8s.io "ebs-sc" deleted diff --git a/examples/kubernetes/modify-volume/manifests/volumeattributesclass.yaml b/examples/kubernetes/modify-volume/manifests/volumeattributesclass.yaml index 4945b190a9..5274403fa6 100644 --- a/examples/kubernetes/modify-volume/manifests/volumeattributesclass.yaml +++ b/examples/kubernetes/modify-volume/manifests/volumeattributesclass.yaml @@ -21,3 +21,5 @@ driverName: ebs.csi.aws.com parameters: type: io2 iops: "10000" + tagSpecification_1: "location=Seattle" + tagSpecification_2: "cost-center=" diff --git a/go.mod b/go.mod index c7ff803c25..814a46ae6e 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 go.opentelemetry.io/otel/sdk v1.27.0 golang.org/x/sys v0.21.0 - google.golang.org/grpc v1.64.0 + google.golang.org/grpc v1.64.1 google.golang.org/protobuf v1.34.2 k8s.io/api v0.31.0-alpha.2 k8s.io/apimachinery v0.31.0-alpha.2 diff --git a/go.sum b/go.sum index 1f3302125d..9d300a8635 100644 --- a/go.sum +++ b/go.sum @@ -1913,8 +1913,8 @@ google.golang.org/grpc v1.52.0/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5v google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= -google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= -google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= +google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/hack/e2e/kops/patch-cluster.yaml b/hack/e2e/kops/patch-cluster.yaml index db82b76232..d441869ecb 100644 --- a/hack/e2e/kops/patch-cluster.yaml +++ b/hack/e2e/kops/patch-cluster.yaml @@ -49,15 +49,7 @@ spec: "Resource": [ "arn:aws:ec2:*:*:volume/*", "arn:aws:ec2:*:*:snapshot/*" - ], - "Condition": { - "StringEquals": { - "ec2:CreateAction": [ - "CreateVolume", - "CreateSnapshot" - ] - } - } + ] }, { "Effect": "Allow", diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index e31e6752ad..36f59d05ef 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -229,6 +229,12 @@ type ModifyDiskOptions struct { Throughput int32 } +// ModifyTagsOptions represents parameter to modify the tags of an existing EBS volume +type ModifyTagsOptions struct { + TagsToAdd map[string]string + TagsToDelete []string +} + // Snapshot represents an EBS volume snapshot type Snapshot struct { SnapshotID string @@ -746,6 +752,42 @@ func (c *cloud) batchDescribeVolumesModifications(request *ec2.DescribeVolumesMo return r.Result, nil } +// ModifyTags adds, updates, and deletes tags for the specified EBS volume. +func (c *cloud) ModifyTags(ctx context.Context, volumeID string, tagOptions ModifyTagsOptions) error { + if len(tagOptions.TagsToDelete) > 0 { + deleteTagsInput := &ec2.DeleteTagsInput{ + Resources: []string{volumeID}, + Tags: make([]types.Tag, 0, len(tagOptions.TagsToDelete)), + } + for _, tagKey := range tagOptions.TagsToDelete { + deleteTagsInput.Tags = append(deleteTagsInput.Tags, types.Tag{Key: aws.String(tagKey)}) + } + _, deleteErr := c.ec2.DeleteTags(ctx, deleteTagsInput) + if deleteErr != nil { + klog.ErrorS(deleteErr, "failed to delete tags", "volumeID", volumeID) + return deleteErr + } + } + if len(tagOptions.TagsToAdd) > 0 { + createTagsInput := &ec2.CreateTagsInput{ + Resources: []string{volumeID}, + Tags: make([]types.Tag, 0, len(tagOptions.TagsToAdd)), + } + for k, v := range tagOptions.TagsToAdd { + createTagsInput.Tags = append(createTagsInput.Tags, types.Tag{ + Key: aws.String(k), + Value: aws.String(v), + }) + } + _, addErr := c.ec2.CreateTags(ctx, createTagsInput) + if addErr != nil { + klog.ErrorS(addErr, "failed to create tags", "volumeID", volumeID) + return addErr + } + } + return nil +} + // ResizeOrModifyDisk resizes an EBS volume in GiB increments, rounding up to the next possible allocatable unit, and/or modifies an EBS // volume with the parameters in ModifyDiskOptions. // The resizing operation is performed only when newSizeBytes != 0. diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index f0142510eb..a6df1d9750 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -2509,6 +2509,121 @@ func TestResizeOrModifyDisk(t *testing.T) { } } +func TestModifyTags(t *testing.T) { + validTagsToAddInput := map[string]string{ + "key1": "value1", + "key2": "value2", + "key3": "", + } + + validTagsToDeleteInput := []string{ + "key1", + "key2", + } + + emptyTagsToAddInput := map[string]string{} + emptyTagsToDeleteInput := []string{} + + testCases := []struct { + name string + volumeID string + negativeCase bool + modifyDiskOptions *ModifyDiskOptions + modifyTagsOptions ModifyTagsOptions + expErr error + mockEC2 *MockEC2API + volumes []types.Volume + }{ + { + name: "success normal tag addition", + volumeID: "mod-tag-test-name", + modifyTagsOptions: ModifyTagsOptions{ + TagsToAdd: validTagsToAddInput, + }, + expErr: nil, + }, + { + name: "success normal tag deletion", + volumeID: "mod-tag-test-name", + modifyTagsOptions: ModifyTagsOptions{ + TagsToDelete: validTagsToDeleteInput, + }, + expErr: nil, + }, + { + name: "success normal tag addition and tag deletion", + volumeID: "mod-tag-test-name", + modifyTagsOptions: ModifyTagsOptions{ + TagsToAdd: validTagsToAddInput, + TagsToDelete: validTagsToDeleteInput, + }, + expErr: nil, + }, + { + name: "fail: EC2 API generic error TagsToAdd", + volumeID: "mod-tag-test-name", + negativeCase: true, + expErr: fmt.Errorf("Generic EC2 API error"), + modifyTagsOptions: ModifyTagsOptions{ + TagsToAdd: validTagsToAddInput, + TagsToDelete: emptyTagsToDeleteInput, + }, + }, + { + name: "fail: EC2 API generic error TagsToDelete", + volumeID: "mod-tag-test-name", + negativeCase: true, + expErr: fmt.Errorf("Generic EC2 API error"), + modifyTagsOptions: ModifyTagsOptions{ + TagsToAdd: emptyTagsToAddInput, + TagsToDelete: validTagsToDeleteInput, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + mockEC2 := NewMockEC2API(mockCtrl) + c := newCloud(mockEC2) + + ctx := context.Background() + + if len(tc.modifyTagsOptions.TagsToAdd) > 0 { + if tc.negativeCase { + mockEC2.EXPECT().CreateTags(gomock.Any(), gomock.Any()).Return(nil, tc.expErr).Times(1) + } else { + mockEC2.EXPECT().CreateTags(gomock.Any(), gomock.Any()).Return(&ec2.CreateTagsOutput{}, tc.expErr).Times(1) + } + } + if len(tc.modifyTagsOptions.TagsToDelete) > 0 { + if tc.negativeCase { + mockEC2.EXPECT().DeleteTags(gomock.Any(), gomock.Any()).Return(nil, tc.expErr).Times(1) + } else { + mockEC2.EXPECT().DeleteTags(gomock.Any(), gomock.Any()).Return(&ec2.DeleteTagsOutput{}, tc.expErr).Times(1) + } + } + + err := c.ModifyTags(ctx, tc.volumeID, tc.modifyTagsOptions) + if err != nil { + if tc.expErr == nil { + t.Fatalf("ModifyTags() failed: expected no error, got: %v", err) + } else { + if !strings.Contains(err.Error(), tc.expErr.Error()) { + t.Fatalf("ModifyTags() failed: expected error %v, got: %v", tc.expErr, err) + } + } + } else { + if tc.expErr != nil { + t.Fatal("ModifyTags() failed: expected error, got nothing") + } + } + + mockCtrl.Finish() + }) + } +} + func TestGetSnapshotByName(t *testing.T) { testCases := []struct { name string diff --git a/pkg/cloud/ec2_interface.go b/pkg/cloud/ec2_interface.go index 897c42b717..f7dad5d7ea 100644 --- a/pkg/cloud/ec2_interface.go +++ b/pkg/cloud/ec2_interface.go @@ -35,5 +35,6 @@ type EC2API interface { DescribeVolumesModifications(ctx context.Context, params *ec2.DescribeVolumesModificationsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeVolumesModificationsOutput, error) DescribeTags(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) CreateTags(ctx context.Context, params *ec2.CreateTagsInput, optFns ...func(*ec2.Options)) (*ec2.CreateTagsOutput, error) + DeleteTags(ctx context.Context, params *ec2.DeleteTagsInput, optFns ...func(*ec2.Options)) (*ec2.DeleteTagsOutput, error) EnableFastSnapshotRestores(ctx context.Context, params *ec2.EnableFastSnapshotRestoresInput, optFns ...func(*ec2.Options)) (*ec2.EnableFastSnapshotRestoresOutput, error) } diff --git a/pkg/cloud/interface.go b/pkg/cloud/interface.go index 97b95fdc9b..eaa3d35ed3 100644 --- a/pkg/cloud/interface.go +++ b/pkg/cloud/interface.go @@ -26,6 +26,7 @@ type Cloud interface { DeleteDisk(ctx context.Context, volumeID string) (success bool, err error) AttachDisk(ctx context.Context, volumeID string, nodeID string) (devicePath string, err error) DetachDisk(ctx context.Context, volumeID string, nodeID string) (err error) + ModifyTags(ctx context.Context, volumeID string, tagOptions ModifyTagsOptions) (err error) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (newSize int32, err error) WaitForAttachmentState(ctx context.Context, volumeID, expectedState string, expectedInstance string, expectedDevice string, alreadyAssigned bool) (*types.VolumeAttachment, error) GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error) diff --git a/pkg/cloud/metadata/k8s.go b/pkg/cloud/metadata/k8s.go index 4c4e875115..a85e6d8f14 100644 --- a/pkg/cloud/metadata/k8s.go +++ b/pkg/cloud/metadata/k8s.go @@ -27,56 +27,70 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/cert" "k8s.io/klog/v2" ) type KubernetesAPIClient func() (kubernetes.Interface, error) -var DefaultKubernetesAPIClient = func() (kubernetes.Interface, error) { - // creates the in-cluster config - config, err := rest.InClusterConfig() - if err != nil { - if errors.Is(err, os.ErrNotExist) { - klog.InfoS("InClusterConfig failed to read token file, retrieving file from sandbox mount point") - // CONTAINER_SANDBOX_MOUNT_POINT env is set upon container creation in containerd v1.6+ - // it provides the absolute host path to the container volume. - sandboxMountPoint := os.Getenv("CONTAINER_SANDBOX_MOUNT_POINT") - if sandboxMountPoint == "" { - return nil, fmt.Errorf("CONTAINER_SANDBOX_MOUNT_POINT environment variable is not set") - } - - tokenFile := filepath.Join(sandboxMountPoint, "var", "run", "secrets", "kubernetes.io", "serviceaccount", "token") - rootCAFile := filepath.Join(sandboxMountPoint, "var", "run", "secrets", "kubernetes.io", "serviceaccount", "ca.crt") - - token, tokenErr := os.ReadFile(tokenFile) +func DefaultKubernetesAPIClient(kubeconfig string) KubernetesAPIClient { + return func() (clientset kubernetes.Interface, err error) { + var config *rest.Config + if kubeconfig != "" { + config, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig}, + &clientcmd.ConfigOverrides{}, + ).ClientConfig() if err != nil { - return nil, tokenErr - } - - tlsClientConfig := rest.TLSClientConfig{} - if _, certErr := cert.NewPool(rootCAFile); err != nil { - return nil, fmt.Errorf("expected to load root CA config from %s, but got err: %w", rootCAFile, certErr) - } else { - tlsClientConfig.CAFile = rootCAFile - } - - config = &rest.Config{ - Host: "https://" + net.JoinHostPort(os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")), - TLSClientConfig: tlsClientConfig, - BearerToken: string(token), - BearerTokenFile: tokenFile, + return nil, err } } else { + // creates the in-cluster config + config, err = rest.InClusterConfig() + if err != nil { + if errors.Is(err, os.ErrNotExist) { + klog.InfoS("InClusterConfig failed to read token file, retrieving file from sandbox mount point") + // CONTAINER_SANDBOX_MOUNT_POINT env is set upon container creation in containerd v1.6+ + // it provides the absolute host path to the container volume. + sandboxMountPoint := os.Getenv("CONTAINER_SANDBOX_MOUNT_POINT") + if sandboxMountPoint == "" { + return nil, fmt.Errorf("CONTAINER_SANDBOX_MOUNT_POINT environment variable is not set") + } + + tokenFile := filepath.Join(sandboxMountPoint, "var", "run", "secrets", "kubernetes.io", "serviceaccount", "token") + rootCAFile := filepath.Join(sandboxMountPoint, "var", "run", "secrets", "kubernetes.io", "serviceaccount", "ca.crt") + + token, tokenErr := os.ReadFile(tokenFile) + if err != nil { + return nil, tokenErr + } + + tlsClientConfig := rest.TLSClientConfig{} + if _, certErr := cert.NewPool(rootCAFile); err != nil { + return nil, fmt.Errorf("expected to load root CA config from %s, but got err: %w", rootCAFile, certErr) + } else { + tlsClientConfig.CAFile = rootCAFile + } + + config = &rest.Config{ + Host: "https://" + net.JoinHostPort(os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")), + TLSClientConfig: tlsClientConfig, + BearerToken: string(token), + BearerTokenFile: tokenFile, + } + } else { + return nil, err + } + } + } + // creates the clientset + clientset, err = kubernetes.NewForConfig(config) + if err != nil { return nil, err } + return clientset, nil } - // creates the clientset - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - return clientset, nil } func KubernetesAPIInstanceInfo(clientset kubernetes.Interface) (*Metadata, error) { diff --git a/pkg/cloud/mock_cloud.go b/pkg/cloud/mock_cloud.go index d6e7a36c46..2cc08c9d3d 100644 --- a/pkg/cloud/mock_cloud.go +++ b/pkg/cloud/mock_cloud.go @@ -244,6 +244,20 @@ func (mr *MockCloudMockRecorder) ListSnapshots(ctx, volumeID, maxResults, nextTo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSnapshots", reflect.TypeOf((*MockCloud)(nil).ListSnapshots), ctx, volumeID, maxResults, nextToken) } +// ModifyTags mocks base method. +func (m *MockCloud) ModifyTags(ctx context.Context, volumeID string, tagOptions ModifyTagsOptions) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ModifyTags", ctx, volumeID, tagOptions) + ret0, _ := ret[0].(error) + return ret0 +} + +// ModifyTags indicates an expected call of ModifyTags. +func (mr *MockCloudMockRecorder) ModifyTags(ctx, volumeID, tagOptions interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ModifyTags", reflect.TypeOf((*MockCloud)(nil).ModifyTags), ctx, volumeID, tagOptions) +} + // ResizeOrModifyDisk mocks base method. func (m *MockCloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (int32, error) { m.ctrl.T.Helper() diff --git a/pkg/cloud/mock_ec2.go b/pkg/cloud/mock_ec2.go index c9b704ea8d..0aa38267b2 100644 --- a/pkg/cloud/mock_ec2.go +++ b/pkg/cloud/mock_ec2.go @@ -149,6 +149,26 @@ func (mr *MockEC2APIMockRecorder) DeleteSnapshot(ctx, params interface{}, optFns return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSnapshot", reflect.TypeOf((*MockEC2API)(nil).DeleteSnapshot), varargs...) } +// DeleteTags mocks base method. +func (m *MockEC2API) DeleteTags(ctx context.Context, params *ec2.DeleteTagsInput, optFns ...func(*ec2.Options)) (*ec2.DeleteTagsOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteTags", varargs...) + ret0, _ := ret[0].(*ec2.DeleteTagsOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteTags indicates an expected call of DeleteTags. +func (mr *MockEC2APIMockRecorder) DeleteTags(ctx, params interface{}, optFns ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTags", reflect.TypeOf((*MockEC2API)(nil).DeleteTags), varargs...) +} + // DeleteVolume mocks base method. func (m *MockEC2API) DeleteVolume(ctx context.Context, params *ec2.DeleteVolumeInput, optFns ...func(*ec2.Options)) (*ec2.DeleteVolumeOutput, error) { m.ctrl.T.Helper() diff --git a/pkg/coalescer/coalescer.go b/pkg/coalescer/coalescer.go index 3eb0a82edd..db4e23f5c3 100644 --- a/pkg/coalescer/coalescer.go +++ b/pkg/coalescer/coalescer.go @@ -31,7 +31,7 @@ import ( // When the delay on the request expires (determined by the time the first request comes in), the merged // input is passed to the execution function, and the result to all waiting callers (those that were // not rejected during the merge step) -type Coalescer[InputType comparable, ResultType any] interface { +type Coalescer[InputType any, ResultType any] interface { // Coalesce is a function to coalesce a given input // key = only requests with this same key will be coalesced (such as volume ID) // input = input to merge with other inputs @@ -46,7 +46,7 @@ type Coalescer[InputType comparable, ResultType any] interface { // (should return an error if the new input cannot be combined with the existing inputs, // otherwise return the new merged input) // executeFunction = the function to call when the delay expires -func New[InputType comparable, ResultType any](delay time.Duration, +func New[InputType any, ResultType any](delay time.Duration, mergeFunction func(input InputType, existing InputType) (InputType, error), executeFunction func(key string, input InputType) (ResultType, error), ) Coalescer[InputType, ResultType] { @@ -71,19 +71,19 @@ type result[ResultType any] struct { // Type to send inputs from Coalesce() to coalescerThread() via channel // Includes a return channel for the result -type newInput[InputType comparable, ResultType any] struct { +type newInput[InputType any, ResultType any] struct { key string input InputType resultChannel chan result[ResultType] } // Type to store pending inputs in the input map -type pendingInput[InputType comparable, ResultType any] struct { +type pendingInput[InputType any, ResultType any] struct { input InputType resultChannels []chan result[ResultType] } -type coalescer[InputType comparable, ResultType any] struct { +type coalescer[InputType any, ResultType any] struct { delay time.Duration mergeFunction func(input InputType, existing InputType) (InputType, error) executeFunction func(key string, input InputType) (ResultType, error) diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 8b2d1605db..93d0d75de6 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -217,14 +217,14 @@ func (d *ControllerService) CreateVolume(ctx context.Context, req *csi.CreateVol // "Values specified in mutable_parameters MUST take precedence over the values from parameters." // https://github.com/container-storage-interface/spec/blob/master/spec.md#createvolume - if modifyOptions.VolumeType != "" { - volumeType = modifyOptions.VolumeType + if modifyOptions.modifyDiskOptions.VolumeType != "" { + volumeType = modifyOptions.modifyDiskOptions.VolumeType } - if modifyOptions.IOPS != 0 { - iops = modifyOptions.IOPS + if modifyOptions.modifyDiskOptions.IOPS != 0 { + iops = modifyOptions.modifyDiskOptions.IOPS } - if modifyOptions.Throughput != 0 { - throughput = modifyOptions.Throughput + if modifyOptions.modifyDiskOptions.Throughput != 0 { + throughput = modifyOptions.modifyDiskOptions.Throughput } responseCtx := map[string]string{} @@ -592,7 +592,8 @@ func (d *ControllerService) ControllerModifyVolume(ctx context.Context, req *csi } _, err = d.modifyVolumeCoalescer.Coalesce(volumeID, modifyVolumeRequest{ - modifyDiskOptions: *options, + modifyDiskOptions: options.modifyDiskOptions, + modifyTagsOptions: options.modifyTagsOptions, }) if err != nil { return nil, err diff --git a/pkg/driver/controller_modify_volume.go b/pkg/driver/controller_modify_volume.go index 94d8aeb85f..cbfceaa7c2 100644 --- a/pkg/driver/controller_modify_volume.go +++ b/pkg/driver/controller_modify_volume.go @@ -20,7 +20,9 @@ import ( "context" "errors" "fmt" + "reflect" "strconv" + "strings" "time" "github.com/awslabs/volume-modifier-for-k8s/pkg/rpc" @@ -39,11 +41,16 @@ const ( ModificationKeyIOPS = "iops" ModificationKeyThroughput = "throughput" + + ModificationAddTag = "tagSpecification" + + ModificationDeleteTag = "tagDeletion" ) type modifyVolumeRequest struct { newSize int64 modifyDiskOptions cloud.ModifyDiskOptions + modifyTagsOptions cloud.ModifyTagsOptions } func (d *ControllerService) GetCSIDriverModificationCapability( @@ -68,9 +75,7 @@ func (d *ControllerService) ModifyVolumeProperties( return nil, err } - _, err = d.modifyVolumeCoalescer.Coalesce(name, modifyVolumeRequest{ - modifyDiskOptions: *options, - }) + _, err = d.modifyVolumeCoalescer.Coalesce(name, *options) if err != nil { return nil, err } @@ -107,30 +112,62 @@ func mergeModifyVolumeRequest(input modifyVolumeRequest, existing modifyVolumeRe } existing.modifyDiskOptions.VolumeType = input.modifyDiskOptions.VolumeType } - + if len(input.modifyTagsOptions.TagsToAdd) > 0 || len(input.modifyTagsOptions.TagsToDelete) > 0 { + if (len(existing.modifyTagsOptions.TagsToAdd) > 0 || len(existing.modifyTagsOptions.TagsToDelete) > 0) && !(reflect.DeepEqual(input.modifyTagsOptions, existing.modifyTagsOptions)) { + return existing, fmt.Errorf("Different tags were requested by a previous request. Current: %v, Requested: %v", existing.modifyTagsOptions, input.modifyTagsOptions) + } + existing.modifyTagsOptions = cloud.ModifyTagsOptions{ + TagsToAdd: input.modifyTagsOptions.TagsToAdd, + TagsToDelete: input.modifyTagsOptions.TagsToDelete, + } + } return existing, nil } +func executeModifyTagsRequest(volumeID string, options modifyVolumeRequest, c cloud.Cloud, ctx context.Context) error { + if len(options.modifyTagsOptions.TagsToAdd) > 0 || len(options.modifyTagsOptions.TagsToDelete) > 0 { + err := c.ModifyTags(ctx, volumeID, options.modifyTagsOptions) + if err != nil { + if errors.Is(err, cloud.ErrInvalidArgument) { + return status.Errorf(codes.InvalidArgument, "Could not modify volume tags (invalid argument) %q: %v", volumeID, err) + } + return status.Errorf(codes.Internal, "Could not modify volume tags %q: %v", volumeID, err) + } + } + return nil +} + func executeModifyVolumeRequest(c cloud.Cloud) func(string, modifyVolumeRequest) (int32, error) { return func(volumeID string, req modifyVolumeRequest) (int32, error) { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - actualSizeGiB, err := c.ResizeOrModifyDisk(ctx, volumeID, req.newSize, &req.modifyDiskOptions) + err := executeModifyTagsRequest(volumeID, req, c, ctx) if err != nil { - // Kubernetes sidecars treats "Invalid Argument" errors as infeasible and retries less aggressively - if errors.Is(err, cloud.ErrInvalidArgument) { - return 0, status.Errorf(codes.InvalidArgument, "Could not modify volume (invalid argument) %q: %v", volumeID, err) + return 0, err + } + if (req.modifyDiskOptions.IOPS != 0) || (req.modifyDiskOptions.Throughput != 0) || (req.modifyDiskOptions.VolumeType != "") || (req.newSize != 0) { + actualSizeGiB, err := c.ResizeOrModifyDisk(ctx, volumeID, req.newSize, &req.modifyDiskOptions) + if err != nil { + if errors.Is(err, cloud.ErrInvalidArgument) { + return 0, status.Errorf(codes.InvalidArgument, "Could not modify volume (invalid argument) %q: %v", volumeID, err) + } + return 0, status.Errorf(codes.Internal, "Could not modify volume %q: %v", volumeID, err) + } else { + return actualSizeGiB, nil } - return 0, status.Errorf(codes.Internal, "Could not modify volume %q: %v", volumeID, err) - } else { - return actualSizeGiB, nil } + // No change to the volume was requested, so return an empty result with no error + return 0, nil } } -func parseModifyVolumeParameters(params map[string]string) (*cloud.ModifyDiskOptions, error) { - options := cloud.ModifyDiskOptions{} - +func parseModifyVolumeParameters(params map[string]string) (*modifyVolumeRequest, error) { + options := modifyVolumeRequest{ + modifyTagsOptions: cloud.ModifyTagsOptions{ + TagsToAdd: make(map[string]string), + TagsToDelete: make([]string, 0), + }, + } for key, value := range params { switch key { case ModificationKeyIOPS: @@ -138,24 +175,36 @@ func parseModifyVolumeParameters(params map[string]string) (*cloud.ModifyDiskOpt if err != nil { return nil, status.Errorf(codes.InvalidArgument, "Could not parse IOPS: %q", value) } - options.IOPS = int32(iops) + options.modifyDiskOptions.IOPS = int32(iops) case ModificationKeyThroughput: throughput, err := strconv.Atoi(value) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "Could not parse throughput: %q", value) } - options.Throughput = int32(throughput) + options.modifyDiskOptions.Throughput = int32(throughput) case DeprecatedModificationKeyVolumeType: if _, ok := params[ModificationKeyVolumeType]; ok { klog.Infof("Ignoring deprecated key `volumeType` because preferred key `type` is present") continue } klog.InfoS("Key `volumeType` is deprecated, please use `type` instead") - options.VolumeType = value + options.modifyDiskOptions.VolumeType = value case ModificationKeyVolumeType: - options.VolumeType = value + options.modifyDiskOptions.VolumeType = value + default: + if strings.HasPrefix(key, ModificationAddTag) { + st := strings.SplitN(value, "=", 2) + if len(st) < 2 { + return nil, status.Errorf(codes.InvalidArgument, "Invalid tag specification: %v", st) + } + options.modifyTagsOptions.TagsToAdd[st[0]] = st[1] + } else if strings.HasPrefix(key, ModificationDeleteTag) { + options.modifyTagsOptions.TagsToDelete = append(options.modifyTagsOptions.TagsToDelete, value) + } } } - + if err := validateExtraTags(options.modifyTagsOptions.TagsToAdd, false); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "Invalid tag value: %v", err) + } return &options, nil } diff --git a/pkg/driver/controller_modify_volume_test.go b/pkg/driver/controller_modify_volume_test.go index c12ae31eee..5e5eea68da 100644 --- a/pkg/driver/controller_modify_volume_test.go +++ b/pkg/driver/controller_modify_volume_test.go @@ -25,26 +25,36 @@ import ( ) const ( - validType = "gp3" - validIops = "2000" - validIopsInt = 2000 - validThroughput = "500" - validThroughputInt = 500 - invalidIops = "123.546" - invalidThroughput = "one hundred" + validType = "gp3" + validIops = "2000" + validIopsInt = 2000 + validThroughput = "500" + validThroughputInt = 500 + invalidIops = "123.546" + invalidThroughput = "one hundred" + validTagSpecificationInput = "key1=tag1" + tagSpecificationWithNoValue = "key3=" + tagSpecificationWithNoEqual = "key1" + validTagDeletion = "key2" + invalidTagSpecification = "aws:test=TEST" ) func TestParseModifyVolumeParameters(t *testing.T) { testCases := []struct { name string params map[string]string - expectedOptions *cloud.ModifyDiskOptions + expectedOptions *modifyVolumeRequest expectError bool }{ { - name: "blank params", - params: map[string]string{}, - expectedOptions: &cloud.ModifyDiskOptions{}, + name: "blank params", + params: map[string]string{}, + expectedOptions: &modifyVolumeRequest{ + modifyTagsOptions: cloud.ModifyTagsOptions{ + TagsToAdd: map[string]string{}, + TagsToDelete: []string{}, + }, + }, }, { name: "basic params", @@ -52,12 +62,31 @@ func TestParseModifyVolumeParameters(t *testing.T) { ModificationKeyVolumeType: validType, ModificationKeyIOPS: validIops, ModificationKeyThroughput: validThroughput, + ModificationAddTag: validTagSpecificationInput, + ModificationDeleteTag: validTagDeletion, + }, + expectedOptions: &modifyVolumeRequest{ + modifyDiskOptions: cloud.ModifyDiskOptions{ + VolumeType: validType, + IOPS: validIopsInt, + Throughput: validThroughputInt, + }, + modifyTagsOptions: cloud.ModifyTagsOptions{ + TagsToAdd: map[string]string{ + "key1": "tag1", + }, + TagsToDelete: []string{ + "key2", + }, + }, }, - expectedOptions: &cloud.ModifyDiskOptions{ - VolumeType: validType, - IOPS: validIopsInt, - Throughput: validThroughputInt, + }, + { + name: "tag specification with inproper length", + params: map[string]string{ + ModificationAddTag: tagSpecificationWithNoEqual, }, + expectError: true, }, { name: "deprecated type", @@ -65,8 +94,14 @@ func TestParseModifyVolumeParameters(t *testing.T) { ModificationKeyVolumeType: validType, DeprecatedModificationKeyVolumeType: "deprecated" + validType, }, - expectedOptions: &cloud.ModifyDiskOptions{ - VolumeType: validType, + expectedOptions: &modifyVolumeRequest{ + modifyDiskOptions: cloud.ModifyDiskOptions{ + VolumeType: validType, + }, + modifyTagsOptions: cloud.ModifyTagsOptions{ + TagsToAdd: map[string]string{}, + TagsToDelete: []string{}, + }, }, }, { @@ -83,6 +118,13 @@ func TestParseModifyVolumeParameters(t *testing.T) { }, expectError: true, }, + { + name: "invalid tag specification", + params: map[string]string{ + ModificationAddTag: invalidTagSpecification, + }, + expectError: true, + }, } for _, tc := range testCases { diff --git a/pkg/driver/options.go b/pkg/driver/options.go index d06f987866..5dc13c4b43 100644 --- a/pkg/driver/options.go +++ b/pkg/driver/options.go @@ -28,6 +28,10 @@ import ( type Options struct { Mode Mode + // Kubeconfig is an absolute path to a kubeconfig file. + // If empty, the in-cluster config will be loaded. + Kubeconfig string + // #### Server options #### //Endpoint is the endpoint for the CSI driver server @@ -86,6 +90,8 @@ type Options struct { } func (o *Options) AddFlags(f *flag.FlagSet) { + f.StringVar(&o.Kubeconfig, "kubeconfig", "", "Absolute path to a kubeconfig file. The default is the emtpy string, which causes the in-cluster config to be used") + // Server options f.StringVar(&o.Endpoint, "endpoint", DefaultCSIEndpoint, "Endpoint for the CSI driver server") f.StringVar(&o.HttpEndpoint, "http-endpoint", "", "The TCP network address where the HTTP server for metrics will listen (example: `:8080`). The default is empty string, which means the server is disabled.") diff --git a/pkg/driver/request_coalescing_test.go b/pkg/driver/request_coalescing_test.go index 326ebc689c..8e700b7354 100644 --- a/pkg/driver/request_coalescing_test.go +++ b/pkg/driver/request_coalescing_test.go @@ -240,7 +240,6 @@ func testPartialFail(t *testing.T, executor modifyVolumeExecutor) { volumeTypeChosen = options.VolumeType return newSize, nil }) - options := &Options{ ModifyVolumeRequestHandlerTimeout: 2 * time.Second, } @@ -334,7 +333,6 @@ func testSequentialRequests(t *testing.T, executor modifyVolumeExecutor) { options: options, modifyVolumeCoalescer: newModifyVolumeCoalescer(mockCloud, options), } - var wg sync.WaitGroup wg.Add(2) diff --git a/tests/e2e/modify_volume.go b/tests/e2e/modify_volume.go index 62c92002a3..b9fc348a01 100644 --- a/tests/e2e/modify_volume.go +++ b/tests/e2e/modify_volume.go @@ -60,14 +60,34 @@ var ( ShouldResizeVolume: false, ShouldTestInvalidModificationRecovery: false, }, - "with new throughput and iops annotations": { + "with a new tag annotation": { + CreateVolumeParameters: defaultModifyVolumeTestGp3CreateVolumeParameters, + ModifyVolumeAnnotations: map[string]string{ + testsuites.AnnotationsTagSpec: "key1=test1", + }, + ShouldResizeVolume: false, + ShouldTestInvalidModificationRecovery: false, + ExternalResizerOnly: true, + }, + "with new throughput, and iops annotations": { + CreateVolumeParameters: defaultModifyVolumeTestGp3CreateVolumeParameters, + ModifyVolumeAnnotations: map[string]string{ + testsuites.AnnotationIops: "4000", + testsuites.AnnotationThroughput: "150", + }, + ShouldResizeVolume: false, + ShouldTestInvalidModificationRecovery: false, + }, + "with new throughput, iops, and tag annotations": { CreateVolumeParameters: defaultModifyVolumeTestGp3CreateVolumeParameters, ModifyVolumeAnnotations: map[string]string{ testsuites.AnnotationIops: "4000", testsuites.AnnotationThroughput: "150", + testsuites.AnnotationsTagSpec: "key2=test2", }, ShouldResizeVolume: false, ShouldTestInvalidModificationRecovery: false, + ExternalResizerOnly: true, }, "with a larger size and new throughput and iops annotations": { CreateVolumeParameters: defaultModifyVolumeTestGp3CreateVolumeParameters, @@ -124,6 +144,9 @@ var _ = Describe("[ebs-csi-e2e] [single-az] [modify-volume] Modifying a PVC", fu modifyVolumeTest := modifyVolumeTest Context(testName, func() { It("will modify associated PV and EBS Volume via volume-modifier-for-k8s", func() { + if modifyVolumeTest.ExternalResizerOnly { + Skip("Functionality being tested is not supported for Modification via volume-modifier-for-k8s, skipping test") + } modifyVolumeTest.Run(cs, ns, ebsDriver, testsuites.VolumeModifierForK8s) }) It("will modify associated PV and EBS Volume via external-resizer", func() { diff --git a/tests/e2e/testsuites/e2e_utils.go b/tests/e2e/testsuites/e2e_utils.go index 4bcfdec20c..e69d7d4b1d 100644 --- a/tests/e2e/testsuites/e2e_utils.go +++ b/tests/e2e/testsuites/e2e_utils.go @@ -42,6 +42,8 @@ const ( AnnotationIops = "ebs.csi.aws.com/iops" AnnotationThroughput = "ebs.csi.aws.com/throughput" AnnotationVolumeType = "ebs.csi.aws.com/volumeType" + AnnotationsTagSpec = "ebs.csi.aws.com/tagSpecification" + AnnotationTagDel = "ebs.csi.aws.com/tagDeletion" ) var DefaultGeneratedVolumeMount = VolumeMountDetails{ diff --git a/tests/e2e/testsuites/modify_volume_tester.go b/tests/e2e/testsuites/modify_volume_tester.go index daf9b46ff4..d8d4015a5f 100644 --- a/tests/e2e/testsuites/modify_volume_tester.go +++ b/tests/e2e/testsuites/modify_volume_tester.go @@ -34,6 +34,7 @@ type ModifyVolumeTest struct { ModifyVolumeAnnotations map[string]string ShouldResizeVolume bool ShouldTestInvalidModificationRecovery bool + ExternalResizerOnly bool } var (