From 617e9e5e9f09630d7cf4c7b4c7b7afb95876382b Mon Sep 17 00:00:00 2001 From: HuangTing-Yao Date: Mon, 18 May 2020 11:20:10 +1000 Subject: [PATCH] [YUNIKORN-153] Move SI functions to si_helper (#118) Scheduler interface functions are currently part of the resource code. The resources and SI are not related and should not be mixed in the source code. This is not a change in functionality. Fixes: #118 --- pkg/common/resource.go | 121 ----------------------------- pkg/common/resource_test.go | 24 ------ pkg/common/si_helper.go | 144 +++++++++++++++++++++++++++++++++++ pkg/common/si_helper_test.go | 48 ++++++++++++ 4 files changed, 192 insertions(+), 145 deletions(-) create mode 100644 pkg/common/si_helper.go create mode 100644 pkg/common/si_helper_test.go diff --git a/pkg/common/resource.go b/pkg/common/resource.go index c7ce855d5..af7093343 100644 --- a/pkg/common/resource.go +++ b/pkg/common/resource.go @@ -23,7 +23,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" - "github.com/apache/incubator-yunikorn-k8shim/pkg/conf" "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" ) @@ -99,126 +98,6 @@ func getResource(resourceList v1.ResourceList) *si.Resource { return resources.Build() } -func CreateUpdateRequestForTask(appID, taskID string, resource *si.Resource) si.UpdateRequest { - ask := si.AllocationAsk{ - AllocationKey: taskID, - ResourceAsk: resource, - ApplicationID: appID, - MaxAllocations: 1, - } - - result := si.UpdateRequest{ - Asks: []*si.AllocationAsk{&ask}, - NewSchedulableNodes: nil, - UpdatedNodes: nil, - UtilizationReports: nil, - RmID: conf.GetSchedulerConf().ClusterID, - } - - return result -} - -func CreateReleaseAskRequestForTask(appID, taskId, partition string) si.UpdateRequest { - toReleases := make([]*si.AllocationAskReleaseRequest, 0) - toReleases = append(toReleases, &si.AllocationAskReleaseRequest{ - ApplicationID: appID, - Allocationkey: taskId, - PartitionName: partition, - Message: "task request is canceled", - }) - - releaseRequest := si.AllocationReleasesRequest{ - AllocationAsksToRelease: toReleases, - } - - result := si.UpdateRequest{ - Releases: &releaseRequest, - RmID: conf.GetSchedulerConf().ClusterID, - } - - return result -} - -func CreateReleaseAllocationRequestForTask(appID, allocUUID, partition string) si.UpdateRequest { - toReleases := make([]*si.AllocationReleaseRequest, 0) - toReleases = append(toReleases, &si.AllocationReleaseRequest{ - ApplicationID: appID, - UUID: allocUUID, - PartitionName: partition, - Message: "task completed", - }) - - releaseRequest := si.AllocationReleasesRequest{ - AllocationsToRelease: toReleases, - } - - result := si.UpdateRequest{ - Releases: &releaseRequest, - RmID: conf.GetSchedulerConf().ClusterID, - } - - return result -} - -func CreateUpdateRequestForNewNode(node Node) si.UpdateRequest { - // Use node's name as the NodeID, this is because when bind pod to node, - // name of node is required but uid is optional. - nodeInfo := &si.NewNodeInfo{ - NodeID: node.name, - SchedulableResource: node.capacity, - // TODO is this required? - Attributes: map[string]string{ - DefaultNodeAttributeHostNameKey: node.name, - DefaultNodeAttributeRackNameKey: DefaultRackName, - }, - } - - nodes := make([]*si.NewNodeInfo, 1) - nodes[0] = nodeInfo - request := si.UpdateRequest{ - NewSchedulableNodes: nodes, - RmID: conf.GetSchedulerConf().ClusterID, - } - return request -} - -func CreateUpdateRequestForUpdatedNode(node Node) si.UpdateRequest { - // Currently only includes resource in the update request - nodeInfo := &si.UpdateNodeInfo{ - NodeID: node.name, - Attributes: make(map[string]string), - SchedulableResource: node.capacity, - OccupiedResource: node.occupied, - Action: si.UpdateNodeInfo_UPDATE, - } - - nodes := make([]*si.UpdateNodeInfo, 1) - nodes[0] = nodeInfo - request := si.UpdateRequest{ - UpdatedNodes: nodes, - RmID: conf.GetSchedulerConf().ClusterID, - } - return request -} - -func CreateUpdateRequestForDeleteNode(node Node) si.UpdateRequest { - deletedNodes := make([]*si.UpdateNodeInfo, 1) - nodeInfo := &si.UpdateNodeInfo{ - NodeID: node.name, - SchedulableResource: node.capacity, - OccupiedResource: node.occupied, - Attributes: make(map[string]string), - Action: si.UpdateNodeInfo_DECOMISSION, - } - - deletedNodes[0] = nodeInfo - request := si.UpdateRequest{ - UpdatedNodes: deletedNodes, - RmID: conf.GetSchedulerConf().ClusterID, - } - return request -} - func Equals(left *si.Resource, right *si.Resource) bool { if left == right { return true diff --git a/pkg/common/resource_test.go b/pkg/common/resource_test.go index 1530c8bd0..a60128ff2 100644 --- a/pkg/common/resource_test.go +++ b/pkg/common/resource_test.go @@ -246,30 +246,6 @@ func TestNodeResource(t *testing.T) { assert.Equal(t, result.Resources[CPU].GetValue(), int64(14500)) } -func TestCreateReleaseAllocationRequest(t *testing.T) { - request := CreateReleaseAllocationRequestForTask("app01", "alloc01", "default") - assert.Assert(t, request.Releases != nil) - assert.Assert(t, request.Releases.AllocationsToRelease != nil) - assert.Assert(t, request.Releases.AllocationAsksToRelease == nil) - assert.Equal(t, len(request.Releases.AllocationsToRelease), 1) - assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 0) - assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, "app01") - assert.Equal(t, request.Releases.AllocationsToRelease[0].UUID, "alloc01") - assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default") -} - -func TestCreateReleaseAskRequestForTask(t *testing.T) { - request := CreateReleaseAskRequestForTask("app01", "task01", "default") - assert.Assert(t, request.Releases != nil) - assert.Assert(t, request.Releases.AllocationsToRelease == nil) - assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) - assert.Equal(t, len(request.Releases.AllocationsToRelease), 0) - assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1) - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].Allocationkey, "task01") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].PartitionName, "default") -} - func TestIsZero(t *testing.T) { r := NewResourceBuilder(). AddResource(Memory, 1). diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go new file mode 100644 index 000000000..ef3f52bb2 --- /dev/null +++ b/pkg/common/si_helper.go @@ -0,0 +1,144 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package common + +import ( + "github.com/apache/incubator-yunikorn-k8shim/pkg/conf" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" +) + +func CreateUpdateRequestForTask(appID, taskID string, resource *si.Resource) si.UpdateRequest { + ask := si.AllocationAsk{ + AllocationKey: taskID, + ResourceAsk: resource, + ApplicationID: appID, + MaxAllocations: 1, + } + + result := si.UpdateRequest{ + Asks: []*si.AllocationAsk{&ask}, + NewSchedulableNodes: nil, + UpdatedNodes: nil, + UtilizationReports: nil, + RmID: conf.GetSchedulerConf().ClusterID, + } + + return result +} + +func CreateReleaseAskRequestForTask(appID, taskId, partition string) si.UpdateRequest { + toReleases := make([]*si.AllocationAskReleaseRequest, 0) + toReleases = append(toReleases, &si.AllocationAskReleaseRequest{ + ApplicationID: appID, + Allocationkey: taskId, + PartitionName: partition, + Message: "task request is canceled", + }) + + releaseRequest := si.AllocationReleasesRequest{ + AllocationAsksToRelease: toReleases, + } + + result := si.UpdateRequest{ + Releases: &releaseRequest, + RmID: conf.GetSchedulerConf().ClusterID, + } + + return result +} + +func CreateReleaseAllocationRequestForTask(appID, allocUUID, partition string) si.UpdateRequest { + toReleases := make([]*si.AllocationReleaseRequest, 0) + toReleases = append(toReleases, &si.AllocationReleaseRequest{ + ApplicationID: appID, + UUID: allocUUID, + PartitionName: partition, + Message: "task completed", + }) + + releaseRequest := si.AllocationReleasesRequest{ + AllocationsToRelease: toReleases, + } + + result := si.UpdateRequest{ + Releases: &releaseRequest, + RmID: conf.GetSchedulerConf().ClusterID, + } + + return result +} + +func CreateUpdateRequestForNewNode(node Node) si.UpdateRequest { + // Use node's name as the NodeID, this is because when bind pod to node, + // name of node is required but uid is optional. + nodeInfo := &si.NewNodeInfo{ + NodeID: node.name, + SchedulableResource: node.capacity, + // TODO is this required? + Attributes: map[string]string{ + DefaultNodeAttributeHostNameKey: node.name, + DefaultNodeAttributeRackNameKey: DefaultRackName, + }, + } + + nodes := make([]*si.NewNodeInfo, 1) + nodes[0] = nodeInfo + request := si.UpdateRequest{ + NewSchedulableNodes: nodes, + RmID: conf.GetSchedulerConf().ClusterID, + } + return request +} + +func CreateUpdateRequestForUpdatedNode(node Node) si.UpdateRequest { + // Currently only includes resource in the update request + nodeInfo := &si.UpdateNodeInfo{ + NodeID: node.name, + Attributes: make(map[string]string), + SchedulableResource: node.capacity, + OccupiedResource: node.occupied, + Action: si.UpdateNodeInfo_UPDATE, + } + + nodes := make([]*si.UpdateNodeInfo, 1) + nodes[0] = nodeInfo + request := si.UpdateRequest{ + UpdatedNodes: nodes, + RmID: conf.GetSchedulerConf().ClusterID, + } + return request +} + +func CreateUpdateRequestForDeleteNode(node Node) si.UpdateRequest { + deletedNodes := make([]*si.UpdateNodeInfo, 1) + nodeInfo := &si.UpdateNodeInfo{ + NodeID: node.name, + SchedulableResource: node.capacity, + OccupiedResource: node.occupied, + Attributes: make(map[string]string), + Action: si.UpdateNodeInfo_DECOMISSION, + } + + deletedNodes[0] = nodeInfo + request := si.UpdateRequest{ + UpdatedNodes: deletedNodes, + RmID: conf.GetSchedulerConf().ClusterID, + } + return request +} diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go new file mode 100644 index 000000000..8b5d97d59 --- /dev/null +++ b/pkg/common/si_helper_test.go @@ -0,0 +1,48 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package common + +import ( + "testing" + + "gotest.tools/assert" +) + +func TestCreateReleaseAllocationRequest(t *testing.T) { + request := CreateReleaseAllocationRequestForTask("app01", "alloc01", "default") + assert.Assert(t, request.Releases != nil) + assert.Assert(t, request.Releases.AllocationsToRelease != nil) + assert.Assert(t, request.Releases.AllocationAsksToRelease == nil) + assert.Equal(t, len(request.Releases.AllocationsToRelease), 1) + assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 0) + assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, "app01") + assert.Equal(t, request.Releases.AllocationsToRelease[0].UUID, "alloc01") + assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default") +} + +func TestCreateReleaseAskRequestForTask(t *testing.T) { + request := CreateReleaseAskRequestForTask("app01", "task01", "default") + assert.Assert(t, request.Releases != nil) + assert.Assert(t, request.Releases.AllocationsToRelease == nil) + assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) + assert.Equal(t, len(request.Releases.AllocationsToRelease), 0) + assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1) + assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01") + assert.Equal(t, request.Releases.AllocationAsksToRelease[0].Allocationkey, "task01") + assert.Equal(t, request.Releases.AllocationAsksToRelease[0].PartitionName, "default") +}