Skip to content

Commit

Permalink
[YUNIKORN-998] expose gang related info in application REST info
Browse files Browse the repository at this point in the history
  • Loading branch information
steinsgateted committed Mar 10, 2022
1 parent 2f7ee5b commit d58052f
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 17 deletions.
4 changes: 4 additions & 0 deletions pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,10 @@ func (cc *ClusterContext) processAsks(request *si.AllocationRequest) {
zap.String("applicationID", siAsk.ApplicationID),
zap.String("askKey", siAsk.AllocationKey),
zap.Error(err))
} else {
if siAsk.Placeholder {
partition.applications[siAsk.ApplicationID].SetPlaceholderData(siAsk.TaskGroupName, resources.NewResourceFromProto(siAsk.ResourceAsk), common.GetRequiredNodeFromTag(siAsk.Tags))
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/objects/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,6 @@ func (a *Allocation) IsReleased() bool {
}

// getTaskGroup returns the task group name if set.
func (a *Allocation) getTaskGroup() string {
func (a *Allocation) GetTaskGroup() string {
return a.taskGroupName
}
2 changes: 1 addition & 1 deletion pkg/scheduler/objects/allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,5 @@ func TestNewAllocFromSI(t *testing.T) {
alloc = NewAllocationFromSI(allocSI)
assert.Assert(t, alloc != nilAlloc, "placeholder ask creation failed unexpectedly")
assert.Assert(t, alloc.IsPlaceholder(), "ask should have been a placeholder")
assert.Equal(t, alloc.getTaskGroup(), "testgroup", "TaskGroupName not set as expected")
assert.Equal(t, alloc.GetTaskGroup(), "testgroup", "TaskGroupName not set as expected")
}
36 changes: 32 additions & 4 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,20 @@ const (
AppTagStateAwareDisable string = "application.stateaware.disable"
)

type PlaceholderData struct {
TaskGroupName string
RequiredNode string
AllocatedResource *resources.Resource
Count int64
Replaced int64
}

type Application struct {
ApplicationID string
Partition string
QueuePath string
SubmissionTime time.Time
ApplicationID string
Partition string
QueuePath string
SubmissionTime time.Time
PlaceholderDatas map[string]*PlaceholderData

// Private fields need protection
queue *Queue // queue the application is running in
Expand Down Expand Up @@ -892,6 +901,11 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator,
alloc.Result = Replaced
// mark placeholder as released
ph.released = true
// store number of palceHolders that have been replaced so far
if sa.PlaceholderDatas != nil {
sa.PlaceholderDatas[ph.taskGroupName].Replaced++
}
// The number of replaced placeHolder
_, err := sa.updateAskRepeatInternal(request, -1)
if err != nil {
log.Logger().Warn("ask repeat update failed unexpectedly",
Expand Down Expand Up @@ -1520,3 +1534,17 @@ func (sa *Application) GetRejectedMessage() string {
defer sa.RUnlock()
return sa.rejectedMessage
}

func (sa *Application) SetPlaceholderData(taskGroupName string, allocatedResource *resources.Resource, requiredNode string) {
sa.Lock()
defer sa.Unlock()
if sa.PlaceholderDatas == nil {
sa.PlaceholderDatas = make(map[string]*PlaceholderData)
}
sa.PlaceholderDatas[taskGroupName] = &PlaceholderData{
TaskGroupName: taskGroupName,
RequiredNode: requiredNode,
AllocatedResource: allocatedResource,
}
sa.PlaceholderDatas[taskGroupName].Count++
}
33 changes: 22 additions & 11 deletions pkg/webservice/dao/application_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@

package dao

import "github.com/apache/incubator-yunikorn-core/pkg/common/resources"

type ApplicationsDAOInfo struct {
Applications []ApplicationDAOInfo `json:"applications"`
}

type ApplicationDAOInfo struct {
ApplicationID string `json:"applicationID"`
UsedResource string `json:"usedResource"`
MaxUsedResource string `json:"maxUsedResource"`
Partition string `json:"partition"`
QueueName string `json:"queueName"`
SubmissionTime int64 `json:"submissionTime"`
FinishedTime *int64 `json:"finishedTime"`
Allocations []AllocationDAOInfo `json:"allocations"`
State string `json:"applicationState"`
User string `json:"user"`
RejectedMessage string `json:"rejectedMessage"`
ApplicationID string `json:"applicationID"`
UsedResource string `json:"usedResource"`
MaxUsedResource string `json:"maxUsedResource"`
Partition string `json:"partition"`
QueueName string `json:"queueName"`
SubmissionTime int64 `json:"submissionTime"`
FinishedTime *int64 `json:"finishedTime"`
Allocations []AllocationDAOInfo `json:"allocations"`
State string `json:"applicationState"`
User string `json:"user"`
RejectedMessage string `json:"rejectedMessage"`
PlaceholderData []PlaceholderDAOInfo `json:"placeholderData"`
}

type AllocationDAOInfo struct {
Expand All @@ -47,3 +50,11 @@ type AllocationDAOInfo struct {
ApplicationID string `json:"applicationId"`
Partition string `json:"partition"`
}

type PlaceholderDAOInfo struct {
TaskGroupName string `json:"taskGroupName"`
RequiredNode string `json:"requiredNode"`
AllocatedResource *resources.Resource `json:"allocatedResource"`
Count int64 `json:"count"`
Replaced int64 `json:"replaced"`
}
15 changes: 15 additions & 0 deletions pkg/webservice/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ func getPartitionJSON(partition *scheduler.PartitionContext) *dao.PartitionDAOIn
func getApplicationJSON(app *objects.Application) *dao.ApplicationDAOInfo {
allocations := app.GetAllAllocations()
allocationInfos := make([]dao.AllocationDAOInfo, 0, len(allocations))
placeHolderInfos := make([]dao.PlaceholderDAOInfo, 0, len(app.PlaceholderDatas))

for _, alloc := range allocations {
allocInfo := dao.AllocationDAOInfo{
AllocationKey: alloc.AllocationKey,
Expand All @@ -290,6 +292,18 @@ func getApplicationJSON(app *objects.Application) *dao.ApplicationDAOInfo {
Partition: alloc.PartitionName,
}
allocationInfos = append(allocationInfos, allocInfo)

if alloc.IsPlaceholder() {
taskGroupName := alloc.GetTaskGroup()
placeHolderInfo := dao.PlaceholderDAOInfo{
TaskGroupName: taskGroupName,
RequiredNode: app.PlaceholderDatas[taskGroupName].RequiredNode,
AllocatedResource: app.PlaceholderDatas[taskGroupName].AllocatedResource,
Count: app.PlaceholderDatas[taskGroupName].Count,
Replaced: app.PlaceholderDatas[taskGroupName].Replaced,
}
placeHolderInfos = append(placeHolderInfos, placeHolderInfo)
}
}

return &dao.ApplicationDAOInfo{
Expand All @@ -304,6 +318,7 @@ func getApplicationJSON(app *objects.Application) *dao.ApplicationDAOInfo {
State: app.CurrentState(),
User: app.GetUser().User,
RejectedMessage: app.GetRejectedMessage(),
PlaceholderData: placeHolderInfos,
}
}

Expand Down

0 comments on commit d58052f

Please sign in to comment.