Skip to content

Commit

Permalink
[YUNIKORN-2042] REST API for specific queue
Browse files Browse the repository at this point in the history
  • Loading branch information
steinsgateted committed Oct 14, 2023
1 parent 63f9731 commit 0a1008a
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 93 deletions.
27 changes: 0 additions & 27 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,33 +589,6 @@ func (sq *Queue) CheckAdminAccess(user security.UserGroup) bool {
return allow
}

// GetQueueInfo returns the queue hierarchy as an object for a REST call.
// This object is used by the deprecated REST API and is succeeded by the GetPartitionQueueDAOInfo call.
func (sq *Queue) GetQueueInfo() dao.QueueDAOInfo {
queueInfo := dao.QueueDAOInfo{}
for _, child := range sq.GetCopyOfChildren() {
queueInfo.ChildQueues = append(queueInfo.ChildQueues, child.GetQueueInfo())
}

// children are done we can now lock just this queue.
sq.RLock()
defer sq.RUnlock()
queueInfo.QueueName = sq.Name
queueInfo.Status = sq.stateMachine.Current()
queueInfo.Capacities = dao.QueueCapacity{
Capacity: sq.guaranteedResource.DAOMap(),
MaxCapacity: sq.maxResource.DAOMap(),
UsedCapacity: sq.allocatedResource.DAOMap(),
AbsUsedCapacity: resources.CalculateAbsUsedCapacity(
sq.maxResource, sq.allocatedResource).DAOMap(),
}
queueInfo.Properties = make(map[string]string)
for k, v := range sq.properties {
queueInfo.Properties[k] = v
}
return queueInfo
}

// GetPartitionQueueDAOInfo returns the queue hierarchy as an object for a REST call.
func (sq *Queue) GetPartitionQueueDAOInfo() dao.PartitionQueueDAOInfo {
queueInfo := dao.PartitionQueueDAOInfo{}
Expand Down
61 changes: 0 additions & 61 deletions pkg/scheduler/objects/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/scheduler/objects/template"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
Expand Down Expand Up @@ -1628,24 +1627,6 @@ func TestGetQueueInfo(t *testing.T) {
assert.NilError(t, err, "failed to create queue: %v", err)
err = child1.IncAllocatedResource(child1used, false)
assert.NilError(t, err, "failed to increment allocated resource: %v", err, err)

var child2 *Queue
child2, err = createManagedQueue(parent, "child2", true, nil)
assert.NilError(t, err, "failed to create child queue: %v", err)
child2.SetMaxResource(resources.NewResource())
rootDaoInfo := root.GetQueueInfo()

compareQueueInfoWithDAO(t, root, rootDaoInfo)
parentDaoInfo := rootDaoInfo.ChildQueues[0]
compareQueueInfoWithDAO(t, parent, parentDaoInfo)
for _, childDao := range parentDaoInfo.ChildQueues {
name := childDao.QueueName
child := parent.children[name]
if child == nil {
t.Fail()
}
compareQueueInfoWithDAO(t, child, childDao)
}
}

func TestGetQueueInfoPropertiesSet(t *testing.T) {
Expand Down Expand Up @@ -1673,48 +1654,6 @@ func TestGetQueueInfoPropertiesSet(t *testing.T) {
// dynamic leaf queue picks up from parent
_, err = createDynamicQueue(parent, "child3", true)
assert.NilError(t, err, "failed to create child queue: %v", err)

rootDaoInfo := root.GetQueueInfo()

compareQueueInfoWithDAO(t, root, rootDaoInfo)
parentDaoInfo := rootDaoInfo.ChildQueues[0]
compareQueueInfoWithDAO(t, parent, parentDaoInfo)
for _, childDao := range parentDaoInfo.ChildQueues {
name := childDao.QueueName
child := parent.children[name]
if child == nil {
t.Fail()
}
compareQueueInfoWithDAO(t, child, childDao)
}
}

func compareQueueInfoWithDAO(t *testing.T, queue *Queue, dao dao.QueueDAOInfo) {
assert.Equal(t, queue.Name, dao.QueueName)
assert.Equal(t, len(queue.children), len(dao.ChildQueues))
assert.Equal(t, queue.stateMachine.Current(), dao.Status)
emptyRes := map[string]int64{}
if queue.allocatedResource == nil {
assert.DeepEqual(t, emptyRes, dao.Capacities.UsedCapacity)
} else {
assert.DeepEqual(t, queue.allocatedResource.DAOMap(), dao.Capacities.UsedCapacity)
}
if queue.maxResource == nil {
assert.DeepEqual(t, emptyRes, dao.Capacities.MaxCapacity)
} else {
assert.DeepEqual(t, queue.maxResource.DAOMap(), dao.Capacities.MaxCapacity)
}
if queue.guaranteedResource == nil {
assert.DeepEqual(t, emptyRes, dao.Capacities.Capacity)
} else {
assert.DeepEqual(t, queue.guaranteedResource.DAOMap(), dao.Capacities.Capacity)
}
assert.Equal(t, len(queue.properties), len(dao.Properties))
if len(queue.properties) > 0 {
for k, v := range queue.properties {
assert.Equal(t, v, dao.Properties[k])
}
}
}

func TestSupportTaskGroup(t *testing.T) {
Expand Down
12 changes: 7 additions & 5 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,11 +467,6 @@ func (pc *PartitionContext) getQueueInternal(name string) *objects.Queue {
return queue
}

// Get the queue info for the whole queue structure to pass to the webservice
func (pc *PartitionContext) GetQueueInfo() dao.QueueDAOInfo {
return pc.root.GetQueueInfo()
}

// Get the queue info for the whole queue structure to pass to the webservice
func (pc *PartitionContext) GetPartitionQueues() dao.PartitionQueueDAOInfo {
var PartitionQueueDAOInfo = dao.PartitionQueueDAOInfo{}
Expand All @@ -480,6 +475,13 @@ func (pc *PartitionContext) GetPartitionQueues() dao.PartitionQueueDAOInfo {
return PartitionQueueDAOInfo
}

func (pc *PartitionContext) GetPartitionQueue(queue *objects.Queue) dao.PartitionQueueDAOInfo {
var PartitionQueueDAOInfo = dao.PartitionQueueDAOInfo{}
PartitionQueueDAOInfo = queue.GetPartitionQueueDAOInfo()
PartitionQueueDAOInfo.Partition = common.GetPartitionNameWithoutClusterID(pc.Name)
return PartitionQueueDAOInfo
}

// Create the recovery queue.
func (pc *PartitionContext) createRecoveryQueue() (*objects.Queue, error) {
return objects.NewRecoveryQueue(pc.root)
Expand Down
29 changes: 29 additions & 0 deletions pkg/webservice/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,31 @@ func getPartitionQueues(w http.ResponseWriter, r *http.Request) {
}
}

func getPartitionQueue(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partition := vars.ByName("partition")
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext != nil {
queueName := vars.ByName("queue")
queue := partitionContext.GetQueue(queueName)
if queue == nil {
buildJSONErrorResponse(w, QueueDoesNotExists, http.StatusBadRequest)
return
}
queueDao := getPartitionQueueDAO(partitionContext, queue)
if err := json.NewEncoder(w).Encode(queueDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
} else {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusBadRequest)
}
}

func getPartitionNodes(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
Expand Down Expand Up @@ -750,6 +775,10 @@ func getPartitionQueuesDAO(lists map[string]*scheduler.PartitionContext) []dao.P
return result
}

func getPartitionQueueDAO(partitionContext *scheduler.PartitionContext, queue *objects.Queue) dao.PartitionQueueDAOInfo {
return partitionContext.GetPartitionQueue(queue)
}

func getClusterDAO(lists map[string]*scheduler.PartitionContext) []*dao.ClusterDAOInfo {
var result []*dao.ClusterDAOInfo

Expand Down
6 changes: 6 additions & 0 deletions pkg/webservice/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ var webRoutes = routes{
"/ws/v1/partition/:partition/queues",
getPartitionQueues,
},
route{
"Scheduler",
"GET",
"/ws/v1/partition/:partition/queue/:queue",
getPartitionQueue,
},
route{
"Scheduler",
"GET",
Expand Down

0 comments on commit 0a1008a

Please sign in to comment.