Skip to content

Commit

Permalink
Merge pull request #446 from Omar007/feature/multi-node-pv
Browse files Browse the repository at this point in the history
[Feature] Allow additional selector terms to be defined in storage config
k8s-ci-robot authored Dec 28, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents ad20e13 + 2729447 commit d37f066
Showing 7 changed files with 142 additions and 195 deletions.
10 changes: 7 additions & 3 deletions helm/provisioner/templates/configmap.yaml
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ data:
{{- end }}
{{- if .Values.useJobForCleaning }}
useJobForCleaning: "yes"
{{- end}}
{{- end }}
{{- if .Values.tolerations }}
jobTolerations: | {{ toYaml .Values.tolerations | nindent 4 }}
{{- end }}
@@ -35,7 +35,7 @@ data:
{{- end }}
{{- if .Values.minResyncPeriod }}
minResyncPeriod: {{ .Values.minResyncPeriod | quote }}
{{- end}}
{{- end }}
storageClassMap: |
{{- range $classConfig := .Values.classes }}
{{ $classConfig.name }}:
@@ -45,7 +45,7 @@ data:
blockCleanerCommand:
{{- range $val := $classConfig.blockCleanerCommand }}
- {{ $val | quote }}
{{- end}}
{{- end }}
{{- end }}
{{- if $classConfig.volumeMode }}
volumeMode: {{ $classConfig.volumeMode }}
@@ -56,4 +56,8 @@ data:
{{- if $classConfig.namePattern }}
namePattern: {{ $classConfig.namePattern | quote }}
{{- end }}
{{- if $classConfig.selector }}
selector:
{{- toYaml $classConfig.selector | nindent 8 }}
{{- end }}
{{- end }}
49 changes: 16 additions & 33 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
@@ -44,7 +44,6 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
volumeUtil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/utils/mount"
)

@@ -141,6 +140,9 @@ type MountConfig struct {
// NamePattern name pattern check
// only discover file name matching pattern("*" by default)
NamePattern string `json:"namePattern" yaml:"namePattern"`
// Additional selector terms to set for node affinity in addition to the provisioner node name.
// Useful for shared disks as affinity can not be changed after provisioning the PV.
Selector []v1.NodeSelectorTerm `json:"selector" yaml:"selector"`
}

// RuntimeConfig stores all the objects that the provisioner needs to run
@@ -495,44 +497,25 @@ func GetVolumeMode(volUtil util.VolumeUtil, fullPath string) (v1.PersistentVolum
return "", fmt.Errorf("Block device check for %q failed: %s", fullPath, errblk)
}

// NodeExists checks to see if a Node exists in the Indexer of a NodeLister.
// It tries to get the node and if it fails, it uses the well known label
// `kubernetes.io/hostname` to find the Node.
func NodeExists(nodeLister corelisters.NodeLister, nodeName string) (bool, error) {
_, err := nodeLister.Get(nodeName)
if errors.IsNotFound(err) {
// AnyNodeExists checks to see if a Node exists in the Indexer of a NodeLister.
// If this fails, it uses the well known label `kubernetes.io/hostname` to find the Node.
// It aborts early if an unexpected error occurs and it's uncertain if a node would exist or not.
func AnyNodeExists(nodeLister corelisters.NodeLister, nodeNames []string) bool {
for _, nodeName := range nodeNames {
_, err := nodeLister.Get(nodeName)
if err == nil || !errors.IsNotFound(err) {
return true
}
req, err := labels.NewRequirement(NodeLabelKey, selection.Equals, []string{nodeName})
if err != nil {
return false, err
return true
}
nodes, err := nodeLister.List(labels.NewSelector().Add(*req))
if err != nil {
return false, err
if err != nil || len(nodes) > 0 {
return true
}
return len(nodes) > 0, nil
}
return err == nil, err
}

// NodeAttachedToLocalPV gets the name of the Node that a local PV has a NodeAffinity to.
// It assumes that there should be only one matching Node for a local PV and that
// the local PV follows the form:
//
// nodeAffinity:
// required:
// nodeSelectorTerms:
// - matchExpressions:
// - key: kubernetes.io/hostname
// operator: In
// values:
// - <node1>
func NodeAttachedToLocalPV(pv *v1.PersistentVolume) (string, bool) {
nodeNames := volumeUtil.GetLocalPersistentVolumeNodeNames(pv)
// We assume that there should only be one matching node.
if nodeNames == nil || len(nodeNames) != 1 {
return "", false
}
return nodeNames[0], true
return false
}

// IsLocalPVWithStorageClass checks that a PV is a local PV that belongs to any of the passed in StorageClasses.
119 changes: 64 additions & 55 deletions pkg/common/common_test.go
Original file line number Diff line number Diff line change
@@ -236,6 +236,46 @@ func TestLoadProvisionerConfigs(t *testing.T) {
},
nil,
},
{
map[string]string{"storageClassMap": `local-storage:
hostDir: /mnt/disks
mountDir: /mnt/disks
selector:
- matchExpressions:
- key: "kubernetes.io/hostname"
operator: "In"
values:
- otherNode1
`,
},
ProvisionerConfiguration{
StorageClassConfig: map[string]MountConfig{
"local-storage": {
HostDir: "/mnt/disks",
MountDir: "/mnt/disks",
BlockCleanerCommand: []string{"/scripts/quick_reset.sh"},
VolumeMode: "Filesystem",
NamePattern: "*",
Selector: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
Values: []string{"otherNode1"},
},
},
},
},
},
},
UseAlphaAPI: true,
MinResyncPeriod: metav1.Duration{
Duration: time.Hour + time.Minute*30,
},
},
nil,
},
}
for _, v := range testcases {
for name, value := range v.data {
@@ -477,7 +517,7 @@ func TestGetVolumeMode(t *testing.T) {
}
}

func TestNodeExists(t *testing.T) {
func TestAnyNodeExists(t *testing.T) {
nodeName := "test-node"
nodeWithName := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
@@ -495,21 +535,39 @@ func TestNodeExists(t *testing.T) {
tests := []struct {
nodeAdded *v1.Node
// Required.
nodeQueried *v1.Node
nodeQueried []string
expectedResult bool
}{
{
nodeAdded: nodeWithName,
nodeQueried: nodeWithName,
nodeQueried: []string{nodeName},
expectedResult: true,
},
{
nodeAdded: nodeWithLabel,
nodeQueried: nodeWithName,
nodeQueried: []string{nodeName},
expectedResult: true,
},
{
nodeQueried: nodeWithName,
nodeQueried: []string{nodeName},
expectedResult: false,
},
{
nodeAdded: nodeWithName,
nodeQueried: []string{"other-node", nodeName},
expectedResult: true,
},
{
nodeAdded: nodeWithLabel,
nodeQueried: []string{"other-node", nodeName},
expectedResult: true,
},
{
nodeQueried: []string{},
expectedResult: false,
},
{
nodeQueried: nil,
expectedResult: false,
},
}
@@ -523,62 +581,13 @@ func TestNodeExists(t *testing.T) {
nodeInformer.Informer().GetStore().Add(test.nodeAdded)
}

exists, err := NodeExists(nodeInformer.Lister(), test.nodeQueried.Name)
if err != nil {
t.Errorf("Got unexpected error: %s", err.Error())
}
exists := AnyNodeExists(nodeInformer.Lister(), test.nodeQueried)
if exists != test.expectedResult {
t.Errorf("expected result: %t, actual: %t", test.expectedResult, exists)
}
}
}

func TestNodeAttachedToLocalPV(t *testing.T) {
nodeName := "testNodeName"

tests := []struct {
name string
pv *v1.PersistentVolume
expectedNodeName string
expectedStatus bool
}{
{
name: "NodeAffinity will all necessary fields",
pv: withNodeAffinity(pv(), []string{nodeName}, NodeLabelKey),
expectedNodeName: nodeName,
expectedStatus: true,
},
{
name: "empty nodeNames array",
pv: withNodeAffinity(pv(), []string{}, NodeLabelKey),
expectedNodeName: "",
expectedStatus: false,
},
{
name: "multiple nodeNames",
pv: withNodeAffinity(pv(), []string{nodeName, "newNode"}, NodeLabelKey),
expectedNodeName: "",
expectedStatus: false,
},
{
name: "wrong node label key",
pv: withNodeAffinity(pv(), []string{nodeName}, "wrongLabel"),
expectedNodeName: "",
expectedStatus: false,
},
}

for _, test := range tests {
nodeName, ok := NodeAttachedToLocalPV(test.pv)
if ok != test.expectedStatus {
t.Errorf("test: %s, status: %t, expectedStaus: %t", test.name, ok, test.expectedStatus)
}
if nodeName != test.expectedNodeName {
t.Errorf("test: %s, nodeName: %s, expectedNodeName: %s", test.name, nodeName, test.expectedNodeName)
}
}
}

func TestIsLocalPVWithStorageClass(t *testing.T) {
tests := []struct {
name string
101 changes: 33 additions & 68 deletions pkg/discovery/discovery.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
"hash/fnv"
"net/http"
"path/filepath"
"slices"
"strings"
"sync"
"time"
@@ -46,11 +47,10 @@ type Discoverer struct {
Labels map[string]string
// ProcTable is a reference to running processes so that we can prevent PV from being created while
// it is being cleaned
CleanupTracker *deleter.CleanupStatusTracker
nodeAffinityAnn string
nodeAffinity *v1.VolumeNodeAffinity
classLister storagev1listers.StorageClassLister
ownerReference *metav1.OwnerReference
CleanupTracker *deleter.CleanupStatusTracker
nodeSelector *v1.NodeSelector
classLister storagev1listers.StorageClassLister
ownerReference *metav1.OwnerReference

Readyz *readyzCheck
}
@@ -106,38 +106,17 @@ func NewDiscoverer(config *common.RuntimeConfig, cleanupTracker *deleter.Cleanup
return nil, fmt.Errorf("Failed to generate owner reference: %v", err)
}

if config.UseAlphaAPI {
nodeAffinity, err := generateNodeAffinity(config.Node)
if err != nil {
return nil, fmt.Errorf("Failed to generate node affinity: %v", err)
}
tmpAnnotations := map[string]string{}
err = StorageNodeAffinityToAlphaAnnotation(tmpAnnotations, nodeAffinity)
if err != nil {
return nil, fmt.Errorf("Failed to convert node affinity to alpha annotation: %v", err)
}
return &Discoverer{
RuntimeConfig: config,
Labels: labelMap,
CleanupTracker: cleanupTracker,
classLister: sharedInformer.Lister(),
nodeAffinityAnn: tmpAnnotations[common.AlphaStorageNodeAffinityAnnotation],
ownerReference: ownerRef,
Readyz: &readyzCheck{},
}, nil
}

volumeNodeAffinity, err := generateVolumeNodeAffinity(config.Node)
nodeSelector, err := generateNodeSelector(config.Node)
if err != nil {
return nil, fmt.Errorf("Failed to generate volume node affinity: %v", err)
return nil, fmt.Errorf("Failed to generate node selector: %v", err)
}

return &Discoverer{
RuntimeConfig: config,
Labels: labelMap,
CleanupTracker: cleanupTracker,
classLister: sharedInformer.Lister(),
nodeAffinity: volumeNodeAffinity,
nodeSelector: nodeSelector,
ownerReference: ownerRef,
Readyz: &readyzCheck{},
}, nil
@@ -160,7 +139,7 @@ func generateOwnerReference(node *v1.Node) (*metav1.OwnerReference, error) {
}, nil
}

func generateNodeAffinity(node *v1.Node) (*v1.NodeAffinity, error) {
func generateNodeSelector(node *v1.Node) (*v1.NodeSelector, error) {
if node.Labels == nil {
return nil, fmt.Errorf("Node does not have labels")
}
@@ -169,42 +148,14 @@ func generateNodeAffinity(node *v1.Node) (*v1.NodeAffinity, error) {
return nil, fmt.Errorf("Node does not have expected label %s", common.NodeLabelKey)
}

return &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: common.NodeLabelKey,
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeValue},
},
},
},
},
},
}, nil
}

func generateVolumeNodeAffinity(node *v1.Node) (*v1.VolumeNodeAffinity, error) {
if node.Labels == nil {
return nil, fmt.Errorf("Node does not have labels")
}
nodeValue, found := node.Labels[common.NodeLabelKey]
if !found {
return nil, fmt.Errorf("Node does not have expected label %s", common.NodeLabelKey)
}

return &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: common.NodeLabelKey,
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeValue},
},
return &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: common.NodeLabelKey,
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeValue},
},
},
},
@@ -437,11 +388,25 @@ func (d *Discoverer) createPV(file, class string, reclaimPolicy v1.PersistentVol
OwnerReference: d.ownerReference,
}

volumeNodeSelector := &v1.NodeSelector{
NodeSelectorTerms: slices.Concat(d.nodeSelector.NodeSelectorTerms, config.Selector),
}

if d.UseAlphaAPI {
nodeAffinity := &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: volumeNodeSelector,
}
tmpAnnotations := map[string]string{}
err := StorageNodeAffinityToAlphaAnnotation(tmpAnnotations, nodeAffinity)
if err != nil {
return fmt.Errorf("error converting volume affinity to alpha annotation: %v", err)
}
localPVConfig.UseAlphaAPI = true
localPVConfig.AffinityAnn = d.nodeAffinityAnn
localPVConfig.AffinityAnn = tmpAnnotations[common.AlphaStorageNodeAffinityAnnotation]
} else {
localPVConfig.NodeAffinity = d.nodeAffinity
localPVConfig.NodeAffinity = &v1.VolumeNodeAffinity{
Required: volumeNodeSelector,
}
}

if config.FsType != "" {
8 changes: 4 additions & 4 deletions pkg/discovery/discovery_test.go
Original file line number Diff line number Diff line change
@@ -753,16 +753,16 @@ func TestUseAlphaAPI(t *testing.T) {
if d.UseAlphaAPI {
t.Fatal("UseAlphaAPI should be false")
}
if len(d.nodeAffinityAnn) != 0 || d.nodeAffinity == nil {
t.Fatal("the value nodeAffinityAnn shouldn't be set while nodeAffinity should")
if d.nodeSelector == nil {
t.Fatal("the value nodeSelector should be set")
}

d = testSetup(t, test, true, false)
if !d.UseAlphaAPI {
t.Fatal("UseAlphaAPI should be true")
}
if d.nodeAffinity != nil || len(d.nodeAffinityAnn) == 0 {
t.Fatal("the value nodeAffinityAnn should be set while nodeAffinity should not")
if d.nodeSelector == nil {
t.Fatal("the value nodeSelector should be set")
}
}

30 changes: 11 additions & 19 deletions pkg/node-cleanup/controller/controller.go
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
volumeUtil "k8s.io/kubernetes/pkg/volume/util"

"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/common"
cleanupmetrics "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/metrics/node-cleanup"
@@ -196,18 +197,15 @@ func (c *CleanupController) syncHandler(ctx context.Context, pvName string) erro
return err
}

nodeName, ok := common.NodeAttachedToLocalPV(pv)
if !ok {
nodeNames := volumeUtil.GetLocalPersistentVolumeNodeNames(pv)
if nodeNames == nil {
// For whatever reason the PV isn't formatted properly so we will
// never be able to get its corresponding Node, so ignore.
klog.Errorf("error getting node attached to pv: %s", pv)
return nil
}

nodeExists, err := common.NodeExists(c.nodeLister, nodeName)
if err != nil {
return err
}
nodeExists := common.AnyNodeExists(c.nodeLister, nodeNames)
// Check that the node the PV/PVC reference is still deleted
if nodeExists {
return nil
@@ -242,7 +240,7 @@ func (c *CleanupController) syncHandler(ctx context.Context, pvName string) erro
}

cleanupmetrics.PersistentVolumeClaimDeleteTotal.Inc()
klog.Infof("Deleted PVC %q that pointed to Node %q", pvClaimRef.Name, nodeName)
klog.Infof("Deleted PVC %q that pointed to non-existent Nodes %q", pvClaimRef.Name, nodeNames)
return nil
}

@@ -264,18 +262,13 @@ func (c *CleanupController) startCleanupTimersIfNeeded() {
continue
}

nodeName, ok := common.NodeAttachedToLocalPV(pv)
if !ok {
nodeNames := volumeUtil.GetLocalPersistentVolumeNodeNames(pv)
if nodeNames == nil {
klog.Errorf("error getting node attached to pv: %s", pv)
continue
}

shouldEnqueue, err := c.shouldEnqueueEntry(pv, nodeName)
if err != nil {
klog.Errorf("error determining whether to enqueue entry with pv %q: %v", pv.Name, err)
continue
}

shouldEnqueue := c.shouldEnqueueEntry(pv, nodeNames)
if shouldEnqueue {
klog.Infof("Starting timer for resource deletion, resource:%s, timer duration: %s", pv.Spec.ClaimRef, c.pvcDeletionDelay.String())
c.eventRecorder.Event(pv.Spec.ClaimRef, v1.EventTypeWarning, "ReferencedNodeDeleted", fmt.Sprintf("PVC is tied to a deleted Node. PVC will be cleaned up in %s if the Node doesn't come back", c.pvcDeletionDelay.String()))
@@ -288,13 +281,12 @@ func (c *CleanupController) startCleanupTimersIfNeeded() {
// shouldEnqueuePV checks if a PV should be enqueued to the entryQueue.
// The PV must be a local PV, have a StorageClass present in the list of storageClassNames, have a NodeAffinity
// to a deleted Node, and have a PVC bound to it (otherwise there's nothing to clean up).
func (c *CleanupController) shouldEnqueueEntry(pv *v1.PersistentVolume, nodeName string) (bool, error) {
func (c *CleanupController) shouldEnqueueEntry(pv *v1.PersistentVolume, nodeNames []string) bool {
if pv.Spec.ClaimRef == nil {
return false, nil
return false
}

exists, err := common.NodeExists(c.nodeLister, nodeName)
return !exists && err == nil, err
return !common.AnyNodeExists(c.nodeLister, nodeNames)
}

// deletePVC deletes the PVC with the given name and namespace
20 changes: 7 additions & 13 deletions pkg/node-cleanup/deleter/deleter.go
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ package deleter

import (
"context"
"fmt"
"time"

v1 "k8s.io/api/core/v1"
@@ -28,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
volumeUtil "k8s.io/kubernetes/pkg/volume/util"

"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/common"
cleanupmetrics "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/metrics/node-cleanup"
@@ -82,12 +82,7 @@ func (d *Deleter) DeletePVs(ctx context.Context) {
continue
}

referencesDeletedNode, err := d.referencesNonExistentNode(pv)
if err != nil {
klog.Errorf("error determining if pv %q references deleted node: %v", pv.Name, err)
continue
}
if !referencesDeletedNode {
if !d.referencesNonExistentNode(pv) {
// PV's node is up so PV is not stale
continue
}
@@ -124,14 +119,13 @@ func (d *Deleter) DeletePVs(ctx context.Context) {
// operator: In
// values:
// - <node1>
func (d *Deleter) referencesNonExistentNode(localPV *v1.PersistentVolume) (bool, error) {
nodeName, ok := common.NodeAttachedToLocalPV(localPV)
if !ok {
return false, fmt.Errorf("Error retrieving node")
func (d *Deleter) referencesNonExistentNode(localPV *v1.PersistentVolume) bool {
nodeNames := volumeUtil.GetLocalPersistentVolumeNodeNames(localPV)
if nodeNames == nil {
return false
}

exists, err := common.NodeExists(d.nodeLister, nodeName)
return !exists && err == nil, err
return !common.AnyNodeExists(d.nodeLister, nodeNames)
}

func (d *Deleter) deletePV(ctx context.Context, pvName string) error {

0 comments on commit d37f066

Please sign in to comment.