Skip to content

Commit

Permalink
refactor: improve support for shared block devices
Browse files Browse the repository at this point in the history
Shared volumeIDs are now independent of the nodes they are created on.

Signed-off-by: Serge Logvinov <[email protected]>
  • Loading branch information
sergelogvinov committed Jun 11, 2024
1 parent 5bf0677 commit 1a65309
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Lint
uses: golangci/golangci-lint-action@v6
with:
version: v1.57.1
version: v1.59.1
args: --timeout=5m --config=.golangci.yml
- name: Unit
run: make unit
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
go-version-file: 'go.mod'

- name: Generate token
uses: actions/create-github-app-token@a0de6af83968303c8c955486bf9739a57d23c7f1 # v1.10.0
uses: actions/create-github-app-token@c8f55efbd427e7465d6da1106e7979bc8aaee856 # v1.10.1
id: token
with:
app-id: "${{ secrets.BOT_APP_ID }}"
Expand Down
25 changes: 8 additions & 17 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ run:
# concurrency: 4

# timeout for analysis, e.g. 30s, 5m, default is 1m
timeout: 10m
deadline: 10m

# exit code when at least one issue was found, default is 1
issues-exit-code: 1
Expand Down Expand Up @@ -58,7 +58,7 @@ linters-settings:
simplify: true
gocyclo:
# minimal code complexity to report, 30 by default (but we recommend 10-20)
min-complexity: 20
min-complexity: 30
maligned:
# print struct with more effective memory layout or not, false by default
suggest-new: true
Expand Down Expand Up @@ -125,7 +125,7 @@ linters-settings:
- prefix(k8s.io) # Groups all imports with the specified Prefix.
cyclop:
# the maximal code complexity to report
max-complexity: 20
max-complexity: 30
gomoddirectives:
replace-local: true
replace-allow-list: []
Expand All @@ -138,26 +138,24 @@ linters:
- depguard
- errorlint
- exhaustruct
- exhaustivestruct
- err113
- forbidigo
- forcetypeassert
- funlen
- gas
- gochecknoglobals
- gochecknoinits
- gocognit
- godox
- godot
- goerr113
- gomnd
- ifshort
- gosec
- mnd
- ireturn # we return interfaces
- maintidx
- nestif
- nilnil # we return "nil, nil"
- nonamedreturns
- nolintlint
- nosnakecase
- paralleltest
- promlinter # https://github.com/golangci/golangci-lint/issues/2222
- tagliatelle # we have many different conventions
Expand All @@ -169,21 +167,14 @@ linters:
- varnamelen # too annoying
- wrapcheck
- perfsprint
- protogetter

# temporarily disabled linters
- copyloopvar
- intrange

# abandoned linters for which golangci shows the warning that the repo is archived by the owner
- golint
- interfacer
- maligned
- scopelint
- varcheck
- structcheck
- deadcode
- ifshort
- perfsprint
- execinquery

disable-all: false
fast: false
Expand Down
7 changes: 3 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3'
services:
base:
image: k8s.gcr.io/pause:3.6
Expand All @@ -24,7 +23,7 @@ services:
source: ./
target: /src
csi-attacher:
image: registry.k8s.io/sig-storage/csi-attacher:v4.4.3
image: registry.k8s.io/sig-storage/csi-attacher:v4.4.4
network_mode: "service:base"
command:
- "--v=5"
Expand All @@ -40,7 +39,7 @@ services:
source: ./hack
target: /etc/kubernetes
csi-resizer:
image: registry.k8s.io/sig-storage/csi-resizer:v1.9.3
image: registry.k8s.io/sig-storage/csi-resizer:v1.9.4
network_mode: "service:base"
command:
- "--v=5"
Expand All @@ -56,7 +55,7 @@ services:
source: ./hack
target: /etc/kubernetes
csi-provisioner:
image: registry.k8s.io/sig-storage/csi-provisioner:v3.6.3
image: registry.k8s.io/sig-storage/csi-provisioner:v3.6.4
network_mode: "service:base"
command:
- "--v=5"
Expand Down
38 changes: 26 additions & 12 deletions pkg/csi/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,13 @@ func (d *ControllerService) CreateVolume(_ context.Context, request *csi.CreateV
return nil, status.Error(codes.AlreadyExists, "volume already exists with same name and different capacity")
}

volID := vol.VolumeID()
if storageConfig["shared"] != nil && int(storageConfig["shared"].(float64)) == 1 {
volID = vol.VolumeSharedID()
}

volume := csi.Volume{
VolumeId: vol.VolumeID(),
VolumeId: volID,
VolumeContext: params,
ContentSource: request.GetVolumeContentSource(),
CapacityBytes: int64(volSizeGB * 1024 * 1024 * 1024),
Expand Down Expand Up @@ -256,9 +261,12 @@ func (d *ControllerService) DeleteVolume(_ context.Context, request *csi.DeleteV
return &csi.DeleteVolumeResponse{}, nil
}

vmr := pxapi.NewVmRef(vmID)
vmr.SetNode(vol.Node())
vmr.SetVmType("qemu")
vmr, err := getVMRefByVolume(cl, vol)
if err != nil {
klog.Errorf("failed to get vm ref by volume: %s, %v", vol.Disk(), err)

return nil, status.Error(codes.Internal, err.Error())
}

if _, err := cl.DeleteVolume(vmr, vol.Storage(), vol.Disk()); err != nil {
klog.Errorf("failed to delete volume: %s", vol.Disk())
Expand Down Expand Up @@ -305,7 +313,7 @@ func (d *ControllerService) ControllerPublishVolume(ctx context.Context, request
return nil, status.Error(codes.InvalidArgument, "NodeID must be provided")
}

if request.VolumeCapability == nil {
if request.GetVolumeCapability() == nil {
return nil, status.Error(codes.InvalidArgument, "VolumeCapability must be provided")
}

Expand Down Expand Up @@ -344,16 +352,18 @@ func (d *ControllerService) ControllerPublishVolume(ctx context.Context, request
vmr.SetVmType("qemu")
}

// if vmr.Node() != vol.Node() {
// return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("volume %s does not exist on the node %s", volumeID, nodeID))
// }
if vol.Zone() == "" {
vol = volume.NewVolume(vol.Region(), vmr.Node(), vol.Storage(), vol.Disk())
}

klog.V(4).Infof("ControllerPublishVolume: vol=%+v vmrid=%+v", vol, vmr)

options := map[string]string{
"backup": "0",
"iothread": "1",
}

if request.Readonly {
if request.GetReadonly() {
options["ro"] = "1"
}

Expand Down Expand Up @@ -485,8 +495,8 @@ func (d *ControllerService) GetCapacity(_ context.Context, request *csi.GetCapac

topology := request.GetAccessibleTopology()
if topology != nil {
region := topology.Segments[corev1.LabelTopologyRegion]
zone := topology.Segments[corev1.LabelTopologyZone]
region := topology.GetSegments()[corev1.LabelTopologyRegion]
zone := topology.GetSegments()[corev1.LabelTopologyZone]
storageName := request.GetParameters()[StorageIDKey]

if region == "" || zone == "" || storageName == "" {
Expand Down Expand Up @@ -629,13 +639,17 @@ func (d *ControllerService) ControllerExpandVolume(_ context.Context, request *c
continue
}

if vm["node"].(string) == vol.Node() {
if vm["node"].(string) == vol.Node() || vol.Node() == "" {
vmID := int(vm["vmid"].(float64))

vmr := pxapi.NewVmRef(vmID)
vmr.SetNode(vol.Node())
vmr.SetVmType("qemu")

if vmr.Node() == "" {
vmr.SetNode(vm["node"].(string))
}

config, err := cl.GetVmConfig(vmr)
if err != nil {
klog.Errorf("failed to get vm config: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/csi/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,8 @@ func (ts *csiTestSuite) TestControllerServiceControllerGetCapabilities() {
ts.Require().NoError(err)
ts.Require().NotNil(resp)

if len(resp.Capabilities) != 6 {
ts.T().Fatalf("unexpected number of capabilities: %d", len(resp.Capabilities))
if len(resp.GetCapabilities()) != 6 {
ts.T().Fatalf("unexpected number of capabilities: %d", len(resp.GetCapabilities()))
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/csi/identity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func TestGetPluginInfo(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, resp)

assert.Equal(t, resp.Name, csi.DriverName)
assert.Equal(t, resp.VendorVersion, csi.DriverVersion)
assert.Equal(t, resp.GetName(), csi.DriverName)
assert.Equal(t, resp.GetVendorVersion(), csi.DriverVersion)
}

func TestGetPluginCapabilities(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/csi/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func (n *NodeService) NodeStageVolume(_ context.Context, request *csi.NodeStageV
fsType := FSTypeExt4

if mnt := volumeCapability.GetMount(); mnt != nil {
if mnt.FsType != "" {
fsType = mnt.FsType
if mnt.GetFsType() != "" {
fsType = mnt.GetFsType()
}

if volumeContext["ssd"] == "true" {
Expand Down Expand Up @@ -361,8 +361,8 @@ func (n *NodeService) NodePublishVolume(_ context.Context, request *csi.NodePubl
fsType := "ext4"

if mnt := volumeCapability.GetMount(); mnt != nil {
if mnt.FsType != "" {
fsType = mnt.FsType
if mnt.GetFsType() != "" {
fsType = mnt.GetFsType()
}
}

Expand Down Expand Up @@ -407,7 +407,7 @@ func (n *NodeService) NodeGetVolumeStats(_ context.Context, request *csi.NodeGet
return nil, status.Error(codes.InvalidArgument, "VolumePath must be provided")
}

exists, err := utilpath.Exists(utilpath.CheckFollowSymlink, request.VolumePath)
exists, err := utilpath.Exists(utilpath.CheckFollowSymlink, request.GetVolumePath())
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to check whether volumePath exists: %s", err)
}
Expand Down Expand Up @@ -561,7 +561,7 @@ func (n *NodeService) NodeGetInfo(ctx context.Context, request *csi.NodeGetInfoR
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool {
hasSupport := func(reqcap *csi.VolumeCapability) bool {
for _, c := range volumeCaps {
if c.GetMode() == reqcap.AccessMode.GetMode() {
if c.GetMode() == reqcap.GetAccessMode().GetMode() {
return true
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/csi/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,12 @@ func TestNodeServiceNodeGetCapabilities(t *testing.T) {
assert.NotNil(t, resp.GetCapabilities())

for _, capability := range resp.GetCapabilities() {
switch capability.GetRpc().Type { //nolint:exhaustive
switch capability.GetRpc().GetType() { //nolint:exhaustive
case proto.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME:
case proto.NodeServiceCapability_RPC_EXPAND_VOLUME:
case proto.NodeServiceCapability_RPC_GET_VOLUME_STATS:
default:
t.Fatalf("Unknown capability: %v", capability.Type)
t.Fatalf("Unknown capability: %v", capability.GetType())
}
}
}
Expand Down
28 changes: 25 additions & 3 deletions pkg/csi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,33 @@ func getNodeWithStorage(cl *pxapi.Client, storageName string) (string, error) {
return "", fmt.Errorf("failed to find node with storage %s", storageName)
}

func getStorageContent(cl *pxapi.Client, vol *volume.Volume) (*storageContent, error) {
vmr := pxapi.NewVmRef(vmID)
vmr.SetNode(vol.Node())
func getVMRefByVolume(cl *pxapi.Client, vol *volume.Volume) (vmr *pxapi.VmRef, err error) {
vmr = pxapi.NewVmRef(vmID)
vmr.SetVmType("qemu")

node := vol.Node()
if node == "" {
node, err = getNodeWithStorage(cl, vol.Storage())
if err != nil {
return nil, err
}
}

if node == "" {
return nil, fmt.Errorf("failed to find node with storage %s", vol.Storage())
}

vmr.SetNode(node)

return vmr, nil
}

func getStorageContent(cl *pxapi.Client, vol *volume.Volume) (*storageContent, error) {
vmr, err := getVMRefByVolume(cl, vol)
if err != nil {
return nil, err
}

context, err := cl.GetStorageContent(vmr, vol.Storage())
if err != nil {
return nil, fmt.Errorf("failed to get storage list: %v", err)
Expand Down
5 changes: 5 additions & 0 deletions pkg/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func (v *Volume) VolumeID() string {
return v.region + "/" + v.zone + "/" + v.storage + "/" + v.disk
}

// VolumeSharedID function returns the shared volume magic string.
func (v *Volume) VolumeSharedID() string {
return v.region + "//" + v.storage + "/" + v.disk
}

// Region function returns the region in which the volume was created.
func (v *Volume) Region() string {
return v.region
Expand Down

0 comments on commit 1a65309

Please sign in to comment.