From f2e42c19dbc4a52bb2b389f5d509923151eaa0b2 Mon Sep 17 00:00:00 2001 From: Omkar Lavangad <141731270+olavangad-px@users.noreply.github.com> Date: Wed, 27 Sep 2023 14:21:26 +0530 Subject: [PATCH] Adding StorageClass parameter preferRemoteNode to disable Anti-HyperConvergence for sharedv4 service volumes (#1515) --- pkg/extender/extender.go | 40 ++++++++++++---- pkg/extender/extender_test.go | 90 +++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 10 deletions(-) diff --git a/pkg/extender/extender.go b/pkg/extender/extender.go index edede0ea37..2ec7d43abb 100644 --- a/pkg/extender/extender.go +++ b/pkg/extender/extender.go @@ -47,6 +47,8 @@ const ( preferLocalNodeOnlyAnnotation = "stork.libopenstorage.org/preferLocalNodeOnly" // StorageClass parameter to check if only remote nodes should be used to schedule a pod preferRemoteNodeOnlyParameter = "stork.libopenstorage.org/preferRemoteNodeOnly" + // StorageClass parameter to disable anti-hyperconvergence for pods using shared v4 service volumes + preferRemoteNodeParameter = "stork.libopenstorage.org/preferRemoteNode" // annotation to skip a volume and its local node replicas for scoring while // scheduling a pod skipScoringLabel = "stork.libopenstorage.org/skipSchedulerScoring" @@ -241,8 +243,10 @@ func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request for _, volumeNode := range volumeInfo.DataNodes { for _, driverNode := range driverNodes { if volumeNode == driverNode.StorageID { - // preferRemoteNodeOnly applies only to volumes with NeedsAntiHyperconvergence - if e.volumePrefersRemoteOnly(volumeInfo) && volumeInfo.NeedsAntiHyperconvergence { + // prefersRemoteNode and preferRemoteNodeOnly apply only to volumes with NeedsAntiHyperconvergence + if volumeInfo.NeedsAntiHyperconvergence && + e.volumePrefersRemoteNode(volumeInfo) && + e.volumePrefersRemoteNodeOnly(volumeInfo) { preferRemoteOnlyExists = true nodeNoAntiHyperconvergedPodAllowed[driverNode.StorageID] = true } @@ -273,15 +277,18 @@ func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request } hyperconvergenceVolumeCount := 0 - nodeHyperconverenceVolumeCount := make(map[string]int) + nodeHyperconvergenceVolumeCount := make(map[string]int) for _, volumeInfo := range driverVolumes { if !volumeInfo.NeedsAntiHyperconvergence { hyperconvergenceVolumeCount++ } for _, volumeNode := range volumeInfo.DataNodes { - // nodeToHyperconverenceVolumeCount is used to determine if valid nodes exist with preferLocalNodeOnly + //This loop is calculating the total number of volumes that are available on one node. + //This is used to decide at a later step if the node can be used to strictly enforce hyperconvergence based on preferLocalNodeOnly. + //preferLocalNodeOnly doesn't apply to volumes with NeedsAntiHyperconvergence set to true. + //So an explicit volumePrefersRemoteNode or volumePrefersRemoteNodeOnly check is not necessary. if preferLocalOnly && !volumeInfo.NeedsAntiHyperconvergence { - nodeHyperconverenceVolumeCount[volumeNode]++ + nodeHyperconvergenceVolumeCount[volumeNode]++ } } } @@ -294,7 +301,7 @@ func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request // If only nodes with replicas are to be preferred, // filter out all nodes that don't have a replica // for all the volumes - if preferLocalOnly && nodeHyperconverenceVolumeCount[driverNode.StorageID] != hyperconvergenceVolumeCount { + if preferLocalOnly && nodeHyperconvergenceVolumeCount[driverNode.StorageID] != hyperconvergenceVolumeCount { continue } if val, ok := nodeNoAntiHyperconvergedPodAllowed[driverNode.StorageID]; ok && val { @@ -354,8 +361,8 @@ func (e *Extender) encodeFilterResponse(encoder *json.Encoder, } } -// volumePrefersRemoteOnly checks if preferRemoteNodeOnly label is applied to the volume -func (e *Extender) volumePrefersRemoteOnly(volumeInfo *volume.Info) bool { +// volumePrefersRemoteNodeOnly checks if preferRemoteNodeOnly label is applied to the volume +func (e *Extender) volumePrefersRemoteNodeOnly(volumeInfo *volume.Info) bool { if volumeInfo.Labels != nil { if value, ok := volumeInfo.Labels[preferRemoteNodeOnlyParameter]; ok { if preferRemoteOnlyExists, err := strconv.ParseBool(value); err == nil { @@ -366,6 +373,18 @@ func (e *Extender) volumePrefersRemoteOnly(volumeInfo *volume.Info) bool { return false } +// volumePrefersRemoteNode checks if preferRemoteNode label is applied to the volume, else returns default value True. +func (e *Extender) volumePrefersRemoteNode(volumeInfo *volume.Info) bool { + if volumeInfo.Labels != nil { + if value, ok := volumeInfo.Labels[preferRemoteNodeParameter]; ok { + if preferRemoteExists, err := strconv.ParseBool(value); err == nil { + return preferRemoteExists + } + } + } + return true +} + func (e *Extender) collectExtenderMetrics() error { fn := func(object runtime.Object) error { pod, ok := object.(*v1.Pod) @@ -652,7 +671,7 @@ func (e *Extender) processPrioritizeRequest(w http.ResponseWriter, req *http.Req storklog.PodLog(pod).Debugf("Skipping volume %v from scoring", volume.VolumeName) continue } - if volume.NeedsAntiHyperconvergence { + if volume.NeedsAntiHyperconvergence && e.volumePrefersRemoteNode(volume) { isAntihyperconvergenceRequired = true storklog.PodLog(pod).Debugf("Skipping NeedsAntiHyperconvergence volume %v from scoring based on hyperconvergence", volume.VolumeName) continue @@ -730,7 +749,8 @@ func (e *Extender) updateForAntiHyperconvergence( storklog.PodLog(pod).Debugf("Skipping volume %v from scoring during antihyperconvergence evaluation due to skipScoringLabel", volume.VolumeName) continue } - if !volume.NeedsAntiHyperconvergence { + //We want hyperconvergence-based scoring for NeedsAntiHyperconvergence volumes with preferRemoteNode parameter set to false + if !volume.NeedsAntiHyperconvergence || !e.volumePrefersRemoteNode(volume) { storklog.PodLog(pod).Debugf("Skipping volume %v from scoring based on antihyperconvergence", volume.VolumeName) continue } diff --git a/pkg/extender/extender_test.go b/pkg/extender/extender_test.go index dd38dfc69b..556fecf4e5 100644 --- a/pkg/extender/extender_test.go +++ b/pkg/extender/extender_test.go @@ -342,11 +342,13 @@ func TestExtender(t *testing.T) { t.Run("preferRemoteNodeOnlyIgnoredForHyperConvergedVolumesTest", preferRemoteNodeOnlyIgnoredForHyperConvergedVolumesTest) t.Run("preferRemoteNodeOnlyFailedSchedulingTest", preferRemoteNodeOnlyFailedSchedulingTest) t.Run("preferRemoteNodeOnlyAntiHyperConvergenceTest", preferRemoteNodeOnlyAntiHyperConvergenceTest) + t.Run("preferRemoteNodeFalseAntiHyperConvergenceFilterTest", preferRemoteNodeFalseAntiHyperConvergenceFilterTest) t.Run("antiHyperConvergenceTest", antiHyperConvergenceTest) t.Run("offlineNodesAntiHyperConvergenceTest", offlineNodesAntiHyperConvergenceTest) t.Run("multiVolumeAntiHyperConvergenceTest", multiVolumeAntiHyperConvergenceTest) t.Run("multiVolume2AntiHyperConvergenceTest", multiVolume2AntiHyperConvergenceTest) t.Run("multiVolume3PreferRemoteOnlyAntiHyperConvergenceTest", multiVolume3PreferRemoteOnlyAntiHyperConvergenceTest) + t.Run("multiVolume4PreferRemoteNodeAntiHyperConvergenceTest", multiVolume4PreferRemoteNodeAntiHyperConvergenceTest) t.Run("multiVolumeSkipAllVolumeScoringTest", multiVolumeSkipAllVolumeScoringTest) t.Run("multiVolumeSkipHyperConvergedVolumesScoringTest", multiVolumeSkipHyperConvergedVolumesScoringTest) t.Run("multiVolumeWithStorageDownNodesAntiHyperConvergenceTest", multiVolumeWithStorageDownNodesAntiHyperConvergenceTest) @@ -1655,6 +1657,34 @@ func preferRemoteNodeOnlyAntiHyperConvergenceTest(t *testing.T) { verifyFilterResponse(t, nodes, []int{4, 5}, filterResponse) } +// Apply preferRemoteNodeOnly = true and preferRemoteNode == false parameters together to a NeedsAntiHyperconvergence volume. +// preferRemoteNodeOnly should get ignored and filter api should return all nodes +func preferRemoteNodeFalseAntiHyperConvergenceFilterTest(t *testing.T) { + nodes := &v1.NodeList{} + nodes.Items = append(nodes.Items, *newNode("node1", "node1", "192.168.0.1", "rack1", "a", "zone1")) + nodes.Items = append(nodes.Items, *newNode("node2", "node2", "192.168.0.2", "rack1", "a", "zone1")) + nodes.Items = append(nodes.Items, *newNode("node3", "node3", "192.168.0.3", "rack1", "a", "zone1")) + nodes.Items = append(nodes.Items, *newNode("node4", "node4", "192.168.0.4", "rack2", "b", "zone2")) + nodes.Items = append(nodes.Items, *newNode("node5", "node5", "192.168.0.5", "rack2", "b", "zone2")) + nodes.Items = append(nodes.Items, *newNode("node6", "node6", "192.168.0.6", "rack2", "b", "zone2")) + + if err := driver.CreateCluster(6, nodes); err != nil { + t.Fatalf("Error creating cluster: %v", err) + } + pod := newPod("preferRemoteNodeFalseAntiHyperConvergenceFilterTest", map[string]bool{"preferRemoteNodeFalseAntiHyperConvergenceFilterTest": false}) + + provNodes := []int{0, 1, 2} + if err := driver.ProvisionVolume("preferRemoteNodeFalseAntiHyperConvergenceFilterTest", provNodes, 3, map[string]string{preferRemoteNodeOnlyParameter: "true", preferRemoteNodeParameter: "false"}, true, false); err != nil { + t.Fatalf("Error provisioning volume: %v", err) + } + + filterResponse, err := sendFilterRequest(pod, nodes) + if err != nil { + t.Fatalf("Error sending filter request: %v", err) + } + verifyFilterResponse(t, nodes, []int{0, 1, 2, 3, 4, 5}, filterResponse) +} + // Use a antiHyperConverged volumes volume such that there are non replica nodes available // Higher scores should be given to the non antiHyperConverged volumes volume nodes func antiHyperConvergenceTest(t *testing.T) { @@ -1881,6 +1911,66 @@ func multiVolume3PreferRemoteOnlyAntiHyperConvergenceTest(t *testing.T) { prioritizeResponse) } +// Deploy both anti hyperconveged volumes with preferRemoteNode false and true respectively together with regular volumes +// Verify hyperconvergence for NeedsAntiHyperconvergence volumes with preferRemoteNode parameter set to false +func multiVolume4PreferRemoteNodeAntiHyperConvergenceTest(t *testing.T) { + nodes := &v1.NodeList{} + nodes.Items = append(nodes.Items, *newNode("node1", "node1", "192.168.0.1", "rack1", "a", "zone1")) + nodes.Items = append(nodes.Items, *newNode("node2", "node2", "192.168.0.2", "rack1", "a", "zone1")) + nodes.Items = append(nodes.Items, *newNode("node3", "node3", "192.168.0.3", "rack1", "a", "zone1")) + nodes.Items = append(nodes.Items, *newNode("node4", "node4", "192.168.0.4", "rack2", "b", "zone2")) + nodes.Items = append(nodes.Items, *newNode("node5", "node5", "192.168.0.5", "rack2", "b", "zone2")) + nodes.Items = append(nodes.Items, *newNode("node6", "node6", "192.168.0.6", "rack2", "b", "zone2")) + + filteredNodes := &v1.NodeList{} + filteredNodes.Items = append(filteredNodes.Items, *newNode("node1", "node1", "192.168.0.1", "rack1", "a", "zone1")) + filteredNodes.Items = append(filteredNodes.Items, *newNode("node2", "node2", "192.168.0.2", "rack1", "a", "zone1")) + filteredNodes.Items = append(filteredNodes.Items, *newNode("node4", "node4", "192.168.0.4", "rack2", "b", "zone2")) + filteredNodes.Items = append(filteredNodes.Items, *newNode("node5", "node5", "192.168.0.5", "rack2", "b", "zone2")) + filteredNodes.Items = append(filteredNodes.Items, *newNode("node6", "node6", "192.168.0.6", "rack2", "b", "zone2")) + + if err := driver.CreateCluster(6, nodes); err != nil { + t.Fatalf("Error creating cluster: %v", err) + } + pod := newPod("multiVolumeAntiHyperConvergenceTest", map[string]bool{"HyperConvergedVolumes4": false, "sharedV4Svc41": false, "sharedV4Svc42": false}) + + regularVolumeProvNodes := []int{0, 1, 2} + if err := driver.ProvisionVolume("HyperConvergedVolumes4", regularVolumeProvNodes, 3, nil, false, false); err != nil { + t.Fatalf("Error provisioning volume: %v", err) + } + + sharedV4SvcPreferRemoteFalseProvNodes := []int{3, 4, 5} + if err := driver.ProvisionVolume("sharedV4Svc41", sharedV4SvcPreferRemoteFalseProvNodes, 3, map[string]string{preferRemoteNodeOnlyParameter: "true", preferRemoteNodeParameter: "false"}, true, false); err != nil { + t.Fatalf("Error provisioning volume: %v", err) + } + + sharedV4SvcProvNodes := []int{2} + if err := driver.ProvisionVolume("sharedV4Svc42", sharedV4SvcProvNodes, 3, map[string]string{preferRemoteNodeOnlyParameter: "true", preferRemoteNodeParameter: "true"}, true, false); err != nil { + t.Fatalf("Error provisioning volume: %v", err) + } + + filterResponse, err := sendFilterRequest(pod, nodes) + if err != nil { + t.Fatalf("Error sending filter request: %v", err) + } + verifyFilterResponse(t, nodes, []int{0, 1, 3, 4, 5}, filterResponse) + + prioritizeResponse, err := sendPrioritizeRequest(pod, filteredNodes) + if err != nil { + t.Fatalf("Error sending prioritize request: %v", err) + } + verifyPrioritizeResponse( + t, + filteredNodes, + []float64{ + 2 * nodePriorityScore, + 2 * nodePriorityScore, + 2 * nodePriorityScore, + 2 * nodePriorityScore, + 2 * nodePriorityScore}, + prioritizeResponse) +} + // Verify skipSchedulerScoring is honored for antihyperconvegence volume pods func multiVolumeSkipHyperConvergedVolumesScoringTest(t *testing.T) { nodes := &v1.NodeList{}