Skip to content

Commit

Permalink
[YUNIKORN-2498] Implement force create flag for recovery queue (#807)
Browse files Browse the repository at this point in the history
To place already running allocations on startup the core
supports a force create flag for an application. The flag wil place the
application into the recovery queue if all other placements fail.
The shim sets the flag in the metadata if the pod has a nodename set
when the metadata is created.

Closes: #807

Signed-off-by: Wilfred Spiegelenburg <[email protected]>
  • Loading branch information
wilfred-s committed Apr 2, 2024
1 parent 8be3910 commit aa20c7b
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 45 deletions.
15 changes: 15 additions & 0 deletions pkg/cache/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
)

func getTaskMetadata(pod *v1.Pod) (TaskMetadata, bool) {
Expand Down Expand Up @@ -77,6 +78,20 @@ func getAppMetadata(pod *v1.Pod) (ApplicationMetadata, bool) {
tags[constants.AppTagNamespace] = pod.Namespace
}

// Make sure we set the force create flag to true if the pod has been scheduled already.
// When we create and link the metadata to an application the application does not exist yet.
// The force flag prevents rejections during initialisation of already allocated pods in a changed
// queue configuration.
// It will also pick up static (mirror) and DaemonSet pods. In certain circumstances this could cause
// pods to be allowed into the recovery queue while they should not. If this becomes an issue we can
// add a filter here.
// NOTE: this could fail to set the flag if the oldest pod for the application is not scheduled and
// later pods are.
tags[common.AppTagCreateForce] = constants.False
if utils.IsAssignedPod(pod) {
tags[common.AppTagCreateForce] = constants.True
}

// attach imagePullSecrets if present
secrets := pod.Spec.ImagePullSecrets
if len(secrets) > 0 {
Expand Down
28 changes: 28 additions & 0 deletions pkg/cache/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
)

const taskGroupInfo = `
Expand Down Expand Up @@ -141,6 +142,7 @@ func TestGetAppMetadata(t *testing.T) { //nolint:funlen
assert.Equal(t, app.Tags["namespace"], "default")
assert.Equal(t, app.Tags[constants.AnnotationSchedulingPolicyParam], "gangSchedulingStyle=Soft")
assert.Equal(t, app.Tags[constants.AppTagImagePullSecrets], "secret1,secret2")
assert.Equal(t, app.Tags[common.AppTagCreateForce], "false")
assert.Assert(t, app.Tags[constants.AnnotationTaskGroups] != "")
assert.Equal(t, app.TaskGroups[0].Name, "test-group-1")
assert.Equal(t, app.TaskGroups[0].MinMember, int32(3))
Expand Down Expand Up @@ -180,6 +182,7 @@ func TestGetAppMetadata(t *testing.T) { //nolint:funlen
assert.Equal(t, app.QueueName, "root.b")
assert.Equal(t, app.User, constants.DefaultUser)
assert.Equal(t, app.Tags["namespace"], "app-namespace-01")
assert.Equal(t, app.Tags[common.AppTagCreateForce], "false")
assert.Equal(t, len(app.TaskGroups), 0)
assert.Equal(t, app.SchedulingPolicyParameters.GetGangSchedulingStyle(), "Hard")

Expand Down Expand Up @@ -209,6 +212,7 @@ func TestGetAppMetadata(t *testing.T) { //nolint:funlen
app, ok = getAppMetadata(&pod)
assert.Equal(t, ok, true)
assert.Equal(t, app.SchedulingPolicyParameters.GetGangSchedulingStyle(), "Soft")
assert.Equal(t, app.Tags[common.AppTagCreateForce], "false")

pod = v1.Pod{
TypeMeta: apis.TypeMeta{
Expand Down Expand Up @@ -239,6 +243,30 @@ func TestGetAppMetadata(t *testing.T) { //nolint:funlen
app, ok = getAppMetadata(&pod)
assert.Equal(t, ok, true)
assert.Equal(t, app.SchedulingPolicyParameters.GetGangSchedulingStyle(), "Soft")
assert.Equal(t, app.Tags[common.AppTagCreateForce], constants.False)

pod = v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod00002",
Namespace: "app-namespace-01",
UID: "UID-POD-00001",
},
Spec: v1.PodSpec{
SchedulerName: constants.SchedulerName,
NodeName: Host1,
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
}

app, ok = getAppMetadata(&pod)
assert.Equal(t, ok, true)
assert.Equal(t, app.Tags[common.AppTagCreateForce], constants.True)

pod = v1.Pod{
TypeMeta: apis.TypeMeta{
Expand Down
1 change: 1 addition & 0 deletions test/e2e/framework/configmanager/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
QueuesPath = "ws/v1/partition/%s/queues"
AppsPath = "ws/v1/partition/%s/queue/%s/applications"
AppPath = "ws/v1/partition/%s/queue/%s/application/%s"
PartitionAppPath = "ws/v1/partition/%s/application/%s"
CompletedAppsPath = "ws/v1/partition/%s/applications/completed"
ConfigPath = "ws/v1/config"
ClustersPath = "ws/v1/clusters"
Expand Down
105 changes: 60 additions & 45 deletions test/e2e/framework/helpers/yunikorn/rest_api_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ func (c *RClient) do(req *http.Request, v interface{}) (*http.Response, error) {
return nil, err
}
defer resp.Body.Close()
// handle error response objects for non 200 responses.
if resp.StatusCode != http.StatusOK {
var daoErr *dao.YAPIError
err = json.NewDecoder(resp.Body).Decode(&daoErr)
if err != nil {
return resp, err
}
err = fmt.Errorf("YAPIError: %d, %s, %s", daoErr.StatusCode, daoErr.Message, daoErr.Description)
return resp, err
}
err = json.NewDecoder(resp.Body).Decode(v)
return resp, err
}
Expand All @@ -93,6 +103,16 @@ func (c *RClient) getBody(req *http.Request) (string, error) {
return "", err
}
defer resp.Body.Close()
// handle error response objects for non 200 responses.
if resp.StatusCode != http.StatusOK {
var daoErr *dao.YAPIError
err = json.NewDecoder(resp.Body).Decode(&daoErr)
if err != nil {
return "", err
}
err = fmt.Errorf("YAPIError: %d, %s, %s", daoErr.StatusCode, daoErr.Message, daoErr.Description)
return "", err
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
Expand Down Expand Up @@ -130,11 +150,40 @@ func (c *RClient) GetHealthCheck() (dao.SchedulerHealthDAOInfo, error) {
return healthCheck, err
}

func (c *RClient) GetPartitions() ([]*dao.PartitionInfo, error) {
req, err := c.newRequest("GET", configmanager.PartitionsPath, nil)
if err != nil {
return nil, err
}
var partitions []*dao.PartitionInfo
_, err = c.do(req, &partitions)
return partitions, err
}

func (c *RClient) WaitForRegistration(partition string, timeout int) error {
return wait.PollUntilContextTimeout(context.Background(), time.Second, time.Duration(timeout)*time.Second, false, c.isPartitionPresent(partition).WithContext())
}

func (c *RClient) isPartitionPresent(partition string) wait.ConditionFunc {
return func() (bool, error) {
partitions, err := c.GetPartitions()
if err != nil {
return false, nil // returning nil here for wait & loop
}
for _, p := range partitions {
if p.Name == partition {
return true, nil
}
}
return false, nil
}
}

func (c *RClient) WaitforQueueToAppear(partition string, queueName string, timeout int) error {
return wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Duration(timeout)*time.Second, false, c.IsQueuePresent(partition, queueName).WithContext())
return wait.PollUntilContextTimeout(context.Background(), time.Second, time.Duration(timeout)*time.Second, false, c.isQueuePresent(partition, queueName).WithContext())
}

func (c *RClient) IsQueuePresent(partition string, queueName string) wait.ConditionFunc {
func (c *RClient) isQueuePresent(partition string, queueName string) wait.ConditionFunc {
return func() (bool, error) {
req, err := c.newRequest("GET", fmt.Sprintf(configmanager.QueuesPath, partition), nil)
if err != nil {
Expand All @@ -157,21 +206,6 @@ func (c *RClient) IsQueuePresent(partition string, queueName string) wait.Condit
}
}

func (c *RClient) GetAllAppInfos() (*dao.ApplicationsDAOInfo, error) {
appsInfos := new(dao.ApplicationsDAOInfo)

req, err := c.newRequest("GET", configmanager.AppsPath, nil)
if err != nil {
return nil, err
}

_, err = c.do(req, &(appsInfos.Applications))
if err != nil {
return nil, err
}
return appsInfos, nil
}

func (c *RClient) GetApps(partition string, queueName string) ([]*dao.ApplicationDAOInfo, error) {
req, err := c.newRequest("GET", fmt.Sprintf(configmanager.AppsPath, partition, queueName), nil)
if err != nil {
Expand All @@ -183,7 +217,13 @@ func (c *RClient) GetApps(partition string, queueName string) ([]*dao.Applicatio
}

func (c *RClient) GetAppInfo(partition string, queueName string, appID string) (*dao.ApplicationDAOInfo, error) {
req, err := c.newRequest("GET", fmt.Sprintf(configmanager.AppPath, partition, queueName, appID), nil)
var path string
if len(queueName) == 0 {
path = fmt.Sprintf(configmanager.PartitionAppPath, partition, appID)
} else {
path = fmt.Sprintf(configmanager.AppPath, partition, queueName, appID)
}
req, err := c.newRequest("GET", path, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -342,31 +382,6 @@ func (c *RClient) ValidateSchedulerConfig(cm v1.ConfigMap) (*dao.ValidateConfRes
return validateConfResponse, err
}

func isRootSched(policy string) wait.ConditionFunc {
return func() (bool, error) {
restClient := RClient{}
qInfo, err := restClient.GetQueues(DefaultPartition)
if err != nil {
return false, err
}
if qInfo == nil {
return false, errors.New("no response from rest client")
}

if policy == DefaultPartition {
return len(qInfo.Properties) == 0, nil
} else if qInfo.Properties["application.sort.policy"] == policy {
return true, nil
}

return false, nil
}
}

func WaitForSchedPolicy(policy string, timeout time.Duration) error {
return wait.PollUntilContextTimeout(context.TODO(), 2*time.Second, timeout, false, isRootSched(policy).WithContext())
}

func GetFailedHealthChecks() (string, error) {
restClient := RClient{}
var failCheck string
Expand Down Expand Up @@ -443,7 +458,7 @@ func (c *RClient) LogAppsInfo(ns string) error {
}

func (c *RClient) LogQueuesInfo() error {
qInfo, getQErr := c.GetPartitions(DefaultPartition)
qInfo, getQErr := c.GetPartitionQueues(DefaultPartition)
if getQErr != nil {
return getQErr
}
Expand All @@ -468,7 +483,7 @@ func (c *RClient) LogNodesInfo() error {
return nil
}

func (c *RClient) GetPartitions(partition string) (*dao.PartitionQueueDAOInfo, error) {
func (c *RClient) GetPartitionQueues(partition string) (*dao.PartitionQueueDAOInfo, error) {
req, err := c.newRequest("GET", fmt.Sprintf(configmanager.QueuesPath, partition), nil)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package restartchangedconfig_test

import (
"path/filepath"
"testing"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/ginkgo/v2/reporters"
"github.com/onsi/gomega"

"github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
)

func init() {
configmanager.YuniKornTestConfig.ParseFlags()
}

func TestRestartChangedConfig(t *testing.T) {
ginkgo.ReportAfterSuite("TestRestartChangedConfig", func(report ginkgo.Report) {
err := reporters.GenerateJUnitReportWithConfig(
report,
filepath.Join(configmanager.YuniKornTestConfig.LogDir, "TEST-restartchangedconfig_junit.xml"),
reporters.JunitReportConfig{OmitSpecLabels: true},
)
Ω(err).NotTo(HaveOccurred())
})
gomega.RegisterFailHandler(ginkgo.Fail)
ginkgo.RunSpecs(t, "TestRestartChangedConfig", ginkgo.Label("TestRestartChangedConfig"))
}

var Ω = gomega.Ω
var HaveOccurred = gomega.HaveOccurred
Loading

0 comments on commit aa20c7b

Please sign in to comment.