Skip to content

Commit

Permalink
Adding StorageClass parameter preferRemoteNode to disable Anti-HyperC…
Browse files Browse the repository at this point in the history
…onvergence for sharedv4 service volumes (#1515)
  • Loading branch information
olavangad-px authored Sep 27, 2023
1 parent d0d4d75 commit f2e42c1
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 10 deletions.
40 changes: 30 additions & 10 deletions pkg/extender/extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
preferLocalNodeOnlyAnnotation = "stork.libopenstorage.org/preferLocalNodeOnly"
// StorageClass parameter to check if only remote nodes should be used to schedule a pod
preferRemoteNodeOnlyParameter = "stork.libopenstorage.org/preferRemoteNodeOnly"
// StorageClass parameter to disable anti-hyperconvergence for pods using shared v4 service volumes
preferRemoteNodeParameter = "stork.libopenstorage.org/preferRemoteNode"
// annotation to skip a volume and its local node replicas for scoring while
// scheduling a pod
skipScoringLabel = "stork.libopenstorage.org/skipSchedulerScoring"
Expand Down Expand Up @@ -241,8 +243,10 @@ func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request
for _, volumeNode := range volumeInfo.DataNodes {
for _, driverNode := range driverNodes {
if volumeNode == driverNode.StorageID {
// preferRemoteNodeOnly applies only to volumes with NeedsAntiHyperconvergence
if e.volumePrefersRemoteOnly(volumeInfo) && volumeInfo.NeedsAntiHyperconvergence {
// prefersRemoteNode and preferRemoteNodeOnly apply only to volumes with NeedsAntiHyperconvergence
if volumeInfo.NeedsAntiHyperconvergence &&
e.volumePrefersRemoteNode(volumeInfo) &&
e.volumePrefersRemoteNodeOnly(volumeInfo) {
preferRemoteOnlyExists = true
nodeNoAntiHyperconvergedPodAllowed[driverNode.StorageID] = true
}
Expand Down Expand Up @@ -273,15 +277,18 @@ func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request
}

hyperconvergenceVolumeCount := 0
nodeHyperconverenceVolumeCount := make(map[string]int)
nodeHyperconvergenceVolumeCount := make(map[string]int)
for _, volumeInfo := range driverVolumes {
if !volumeInfo.NeedsAntiHyperconvergence {
hyperconvergenceVolumeCount++
}
for _, volumeNode := range volumeInfo.DataNodes {
// nodeToHyperconverenceVolumeCount is used to determine if valid nodes exist with preferLocalNodeOnly
//This loop is calculating the total number of volumes that are available on one node.
//This is used to decide at a later step if the node can be used to strictly enforce hyperconvergence based on preferLocalNodeOnly.
//preferLocalNodeOnly doesn't apply to volumes with NeedsAntiHyperconvergence set to true.
//So an explicit volumePrefersRemoteNode or volumePrefersRemoteNodeOnly check is not necessary.
if preferLocalOnly && !volumeInfo.NeedsAntiHyperconvergence {
nodeHyperconverenceVolumeCount[volumeNode]++
nodeHyperconvergenceVolumeCount[volumeNode]++
}
}
}
Expand All @@ -294,7 +301,7 @@ func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request
// If only nodes with replicas are to be preferred,
// filter out all nodes that don't have a replica
// for all the volumes
if preferLocalOnly && nodeHyperconverenceVolumeCount[driverNode.StorageID] != hyperconvergenceVolumeCount {
if preferLocalOnly && nodeHyperconvergenceVolumeCount[driverNode.StorageID] != hyperconvergenceVolumeCount {
continue
}
if val, ok := nodeNoAntiHyperconvergedPodAllowed[driverNode.StorageID]; ok && val {
Expand Down Expand Up @@ -354,8 +361,8 @@ func (e *Extender) encodeFilterResponse(encoder *json.Encoder,
}
}

// volumePrefersRemoteOnly checks if preferRemoteNodeOnly label is applied to the volume
func (e *Extender) volumePrefersRemoteOnly(volumeInfo *volume.Info) bool {
// volumePrefersRemoteNodeOnly checks if preferRemoteNodeOnly label is applied to the volume
func (e *Extender) volumePrefersRemoteNodeOnly(volumeInfo *volume.Info) bool {
if volumeInfo.Labels != nil {
if value, ok := volumeInfo.Labels[preferRemoteNodeOnlyParameter]; ok {
if preferRemoteOnlyExists, err := strconv.ParseBool(value); err == nil {
Expand All @@ -366,6 +373,18 @@ func (e *Extender) volumePrefersRemoteOnly(volumeInfo *volume.Info) bool {
return false
}

// volumePrefersRemoteNode checks if preferRemoteNode label is applied to the volume, else returns default value True.
func (e *Extender) volumePrefersRemoteNode(volumeInfo *volume.Info) bool {
if volumeInfo.Labels != nil {
if value, ok := volumeInfo.Labels[preferRemoteNodeParameter]; ok {
if preferRemoteExists, err := strconv.ParseBool(value); err == nil {
return preferRemoteExists
}
}
}
return true
}

func (e *Extender) collectExtenderMetrics() error {
fn := func(object runtime.Object) error {
pod, ok := object.(*v1.Pod)
Expand Down Expand Up @@ -652,7 +671,7 @@ func (e *Extender) processPrioritizeRequest(w http.ResponseWriter, req *http.Req
storklog.PodLog(pod).Debugf("Skipping volume %v from scoring", volume.VolumeName)
continue
}
if volume.NeedsAntiHyperconvergence {
if volume.NeedsAntiHyperconvergence && e.volumePrefersRemoteNode(volume) {
isAntihyperconvergenceRequired = true
storklog.PodLog(pod).Debugf("Skipping NeedsAntiHyperconvergence volume %v from scoring based on hyperconvergence", volume.VolumeName)
continue
Expand Down Expand Up @@ -730,7 +749,8 @@ func (e *Extender) updateForAntiHyperconvergence(
storklog.PodLog(pod).Debugf("Skipping volume %v from scoring during antihyperconvergence evaluation due to skipScoringLabel", volume.VolumeName)
continue
}
if !volume.NeedsAntiHyperconvergence {
//We want hyperconvergence-based scoring for NeedsAntiHyperconvergence volumes with preferRemoteNode parameter set to false
if !volume.NeedsAntiHyperconvergence || !e.volumePrefersRemoteNode(volume) {
storklog.PodLog(pod).Debugf("Skipping volume %v from scoring based on antihyperconvergence", volume.VolumeName)
continue
}
Expand Down
90 changes: 90 additions & 0 deletions pkg/extender/extender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,13 @@ func TestExtender(t *testing.T) {
t.Run("preferRemoteNodeOnlyIgnoredForHyperConvergedVolumesTest", preferRemoteNodeOnlyIgnoredForHyperConvergedVolumesTest)
t.Run("preferRemoteNodeOnlyFailedSchedulingTest", preferRemoteNodeOnlyFailedSchedulingTest)
t.Run("preferRemoteNodeOnlyAntiHyperConvergenceTest", preferRemoteNodeOnlyAntiHyperConvergenceTest)
t.Run("preferRemoteNodeFalseAntiHyperConvergenceFilterTest", preferRemoteNodeFalseAntiHyperConvergenceFilterTest)
t.Run("antiHyperConvergenceTest", antiHyperConvergenceTest)
t.Run("offlineNodesAntiHyperConvergenceTest", offlineNodesAntiHyperConvergenceTest)
t.Run("multiVolumeAntiHyperConvergenceTest", multiVolumeAntiHyperConvergenceTest)
t.Run("multiVolume2AntiHyperConvergenceTest", multiVolume2AntiHyperConvergenceTest)
t.Run("multiVolume3PreferRemoteOnlyAntiHyperConvergenceTest", multiVolume3PreferRemoteOnlyAntiHyperConvergenceTest)
t.Run("multiVolume4PreferRemoteNodeAntiHyperConvergenceTest", multiVolume4PreferRemoteNodeAntiHyperConvergenceTest)
t.Run("multiVolumeSkipAllVolumeScoringTest", multiVolumeSkipAllVolumeScoringTest)
t.Run("multiVolumeSkipHyperConvergedVolumesScoringTest", multiVolumeSkipHyperConvergedVolumesScoringTest)
t.Run("multiVolumeWithStorageDownNodesAntiHyperConvergenceTest", multiVolumeWithStorageDownNodesAntiHyperConvergenceTest)
Expand Down Expand Up @@ -1655,6 +1657,34 @@ func preferRemoteNodeOnlyAntiHyperConvergenceTest(t *testing.T) {
verifyFilterResponse(t, nodes, []int{4, 5}, filterResponse)
}

// Apply preferRemoteNodeOnly = true and preferRemoteNode == false parameters together to a NeedsAntiHyperconvergence volume.
// preferRemoteNodeOnly should get ignored and filter api should return all nodes
func preferRemoteNodeFalseAntiHyperConvergenceFilterTest(t *testing.T) {
nodes := &v1.NodeList{}
nodes.Items = append(nodes.Items, *newNode("node1", "node1", "192.168.0.1", "rack1", "a", "zone1"))
nodes.Items = append(nodes.Items, *newNode("node2", "node2", "192.168.0.2", "rack1", "a", "zone1"))
nodes.Items = append(nodes.Items, *newNode("node3", "node3", "192.168.0.3", "rack1", "a", "zone1"))
nodes.Items = append(nodes.Items, *newNode("node4", "node4", "192.168.0.4", "rack2", "b", "zone2"))
nodes.Items = append(nodes.Items, *newNode("node5", "node5", "192.168.0.5", "rack2", "b", "zone2"))
nodes.Items = append(nodes.Items, *newNode("node6", "node6", "192.168.0.6", "rack2", "b", "zone2"))

if err := driver.CreateCluster(6, nodes); err != nil {
t.Fatalf("Error creating cluster: %v", err)
}
pod := newPod("preferRemoteNodeFalseAntiHyperConvergenceFilterTest", map[string]bool{"preferRemoteNodeFalseAntiHyperConvergenceFilterTest": false})

provNodes := []int{0, 1, 2}
if err := driver.ProvisionVolume("preferRemoteNodeFalseAntiHyperConvergenceFilterTest", provNodes, 3, map[string]string{preferRemoteNodeOnlyParameter: "true", preferRemoteNodeParameter: "false"}, true, false); err != nil {
t.Fatalf("Error provisioning volume: %v", err)
}

filterResponse, err := sendFilterRequest(pod, nodes)
if err != nil {
t.Fatalf("Error sending filter request: %v", err)
}
verifyFilterResponse(t, nodes, []int{0, 1, 2, 3, 4, 5}, filterResponse)
}

// Use a antiHyperConverged volumes volume such that there are non replica nodes available
// Higher scores should be given to the non antiHyperConverged volumes volume nodes
func antiHyperConvergenceTest(t *testing.T) {
Expand Down Expand Up @@ -1881,6 +1911,66 @@ func multiVolume3PreferRemoteOnlyAntiHyperConvergenceTest(t *testing.T) {
prioritizeResponse)
}

// Deploy both anti hyperconveged volumes with preferRemoteNode false and true respectively together with regular volumes
// Verify hyperconvergence for NeedsAntiHyperconvergence volumes with preferRemoteNode parameter set to false
func multiVolume4PreferRemoteNodeAntiHyperConvergenceTest(t *testing.T) {
nodes := &v1.NodeList{}
nodes.Items = append(nodes.Items, *newNode("node1", "node1", "192.168.0.1", "rack1", "a", "zone1"))
nodes.Items = append(nodes.Items, *newNode("node2", "node2", "192.168.0.2", "rack1", "a", "zone1"))
nodes.Items = append(nodes.Items, *newNode("node3", "node3", "192.168.0.3", "rack1", "a", "zone1"))
nodes.Items = append(nodes.Items, *newNode("node4", "node4", "192.168.0.4", "rack2", "b", "zone2"))
nodes.Items = append(nodes.Items, *newNode("node5", "node5", "192.168.0.5", "rack2", "b", "zone2"))
nodes.Items = append(nodes.Items, *newNode("node6", "node6", "192.168.0.6", "rack2", "b", "zone2"))

filteredNodes := &v1.NodeList{}
filteredNodes.Items = append(filteredNodes.Items, *newNode("node1", "node1", "192.168.0.1", "rack1", "a", "zone1"))
filteredNodes.Items = append(filteredNodes.Items, *newNode("node2", "node2", "192.168.0.2", "rack1", "a", "zone1"))
filteredNodes.Items = append(filteredNodes.Items, *newNode("node4", "node4", "192.168.0.4", "rack2", "b", "zone2"))
filteredNodes.Items = append(filteredNodes.Items, *newNode("node5", "node5", "192.168.0.5", "rack2", "b", "zone2"))
filteredNodes.Items = append(filteredNodes.Items, *newNode("node6", "node6", "192.168.0.6", "rack2", "b", "zone2"))

if err := driver.CreateCluster(6, nodes); err != nil {
t.Fatalf("Error creating cluster: %v", err)
}
pod := newPod("multiVolumeAntiHyperConvergenceTest", map[string]bool{"HyperConvergedVolumes4": false, "sharedV4Svc41": false, "sharedV4Svc42": false})

regularVolumeProvNodes := []int{0, 1, 2}
if err := driver.ProvisionVolume("HyperConvergedVolumes4", regularVolumeProvNodes, 3, nil, false, false); err != nil {
t.Fatalf("Error provisioning volume: %v", err)
}

sharedV4SvcPreferRemoteFalseProvNodes := []int{3, 4, 5}
if err := driver.ProvisionVolume("sharedV4Svc41", sharedV4SvcPreferRemoteFalseProvNodes, 3, map[string]string{preferRemoteNodeOnlyParameter: "true", preferRemoteNodeParameter: "false"}, true, false); err != nil {
t.Fatalf("Error provisioning volume: %v", err)
}

sharedV4SvcProvNodes := []int{2}
if err := driver.ProvisionVolume("sharedV4Svc42", sharedV4SvcProvNodes, 3, map[string]string{preferRemoteNodeOnlyParameter: "true", preferRemoteNodeParameter: "true"}, true, false); err != nil {
t.Fatalf("Error provisioning volume: %v", err)
}

filterResponse, err := sendFilterRequest(pod, nodes)
if err != nil {
t.Fatalf("Error sending filter request: %v", err)
}
verifyFilterResponse(t, nodes, []int{0, 1, 3, 4, 5}, filterResponse)

prioritizeResponse, err := sendPrioritizeRequest(pod, filteredNodes)
if err != nil {
t.Fatalf("Error sending prioritize request: %v", err)
}
verifyPrioritizeResponse(
t,
filteredNodes,
[]float64{
2 * nodePriorityScore,
2 * nodePriorityScore,
2 * nodePriorityScore,
2 * nodePriorityScore,
2 * nodePriorityScore},
prioritizeResponse)
}

// Verify skipSchedulerScoring is honored for antihyperconvegence volume pods
func multiVolumeSkipHyperConvergedVolumesScoringTest(t *testing.T) {
nodes := &v1.NodeList{}
Expand Down

0 comments on commit f2e42c1

Please sign in to comment.