From d58052f068ae631fc2529efb95e1ab127f266571 Mon Sep 17 00:00:00 2001 From: steinsgateted Date: Thu, 10 Mar 2022 18:17:26 +0800 Subject: [PATCH] [YUNIKORN-998] expose gang related info in application REST info --- pkg/scheduler/context.go | 4 +++ pkg/scheduler/objects/allocation.go | 2 +- pkg/scheduler/objects/allocation_test.go | 2 +- pkg/scheduler/objects/application.go | 36 +++++++++++++++++++++--- pkg/webservice/dao/application_info.go | 33 ++++++++++++++-------- pkg/webservice/handlers.go | 15 ++++++++++ 6 files changed, 75 insertions(+), 17 deletions(-) diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go index 5b52c9cb6..2ad735da9 100644 --- a/pkg/scheduler/context.go +++ b/pkg/scheduler/context.go @@ -785,6 +785,10 @@ func (cc *ClusterContext) processAsks(request *si.AllocationRequest) { zap.String("applicationID", siAsk.ApplicationID), zap.String("askKey", siAsk.AllocationKey), zap.Error(err)) + } else { + if siAsk.Placeholder { + partition.applications[siAsk.ApplicationID].SetPlaceholderData(siAsk.TaskGroupName, resources.NewResourceFromProto(siAsk.ResourceAsk), common.GetRequiredNodeFromTag(siAsk.Tags)) + } } } diff --git a/pkg/scheduler/objects/allocation.go b/pkg/scheduler/objects/allocation.go index c4925ca44..a0f7013b9 100644 --- a/pkg/scheduler/objects/allocation.go +++ b/pkg/scheduler/objects/allocation.go @@ -159,6 +159,6 @@ func (a *Allocation) IsReleased() bool { } // getTaskGroup returns the task group name if set. -func (a *Allocation) getTaskGroup() string { +func (a *Allocation) GetTaskGroup() string { return a.taskGroupName } diff --git a/pkg/scheduler/objects/allocation_test.go b/pkg/scheduler/objects/allocation_test.go index 66501c28e..810f94c07 100644 --- a/pkg/scheduler/objects/allocation_test.go +++ b/pkg/scheduler/objects/allocation_test.go @@ -133,5 +133,5 @@ func TestNewAllocFromSI(t *testing.T) { alloc = NewAllocationFromSI(allocSI) assert.Assert(t, alloc != nilAlloc, "placeholder ask creation failed unexpectedly") assert.Assert(t, alloc.IsPlaceholder(), "ask should have been a placeholder") - assert.Equal(t, alloc.getTaskGroup(), "testgroup", "TaskGroupName not set as expected") + assert.Equal(t, alloc.GetTaskGroup(), "testgroup", "TaskGroupName not set as expected") } diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index c5d52803c..f8ac022e1 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -53,11 +53,20 @@ const ( AppTagStateAwareDisable string = "application.stateaware.disable" ) +type PlaceholderData struct { + TaskGroupName string + RequiredNode string + AllocatedResource *resources.Resource + Count int64 + Replaced int64 +} + type Application struct { - ApplicationID string - Partition string - QueuePath string - SubmissionTime time.Time + ApplicationID string + Partition string + QueuePath string + SubmissionTime time.Time + PlaceholderDatas map[string]*PlaceholderData // Private fields need protection queue *Queue // queue the application is running in @@ -892,6 +901,11 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator, alloc.Result = Replaced // mark placeholder as released ph.released = true + // store number of palceHolders that have been replaced so far + if sa.PlaceholderDatas != nil { + sa.PlaceholderDatas[ph.taskGroupName].Replaced++ + } + // The number of replaced placeHolder _, err := sa.updateAskRepeatInternal(request, -1) if err != nil { log.Logger().Warn("ask repeat update failed unexpectedly", @@ -1520,3 +1534,17 @@ func (sa *Application) GetRejectedMessage() string { defer sa.RUnlock() return sa.rejectedMessage } + +func (sa *Application) SetPlaceholderData(taskGroupName string, allocatedResource *resources.Resource, requiredNode string) { + sa.Lock() + defer sa.Unlock() + if sa.PlaceholderDatas == nil { + sa.PlaceholderDatas = make(map[string]*PlaceholderData) + } + sa.PlaceholderDatas[taskGroupName] = &PlaceholderData{ + TaskGroupName: taskGroupName, + RequiredNode: requiredNode, + AllocatedResource: allocatedResource, + } + sa.PlaceholderDatas[taskGroupName].Count++ +} diff --git a/pkg/webservice/dao/application_info.go b/pkg/webservice/dao/application_info.go index 4be97387c..6e1c13da8 100644 --- a/pkg/webservice/dao/application_info.go +++ b/pkg/webservice/dao/application_info.go @@ -18,22 +18,25 @@ package dao +import "github.com/apache/incubator-yunikorn-core/pkg/common/resources" + type ApplicationsDAOInfo struct { Applications []ApplicationDAOInfo `json:"applications"` } type ApplicationDAOInfo struct { - ApplicationID string `json:"applicationID"` - UsedResource string `json:"usedResource"` - MaxUsedResource string `json:"maxUsedResource"` - Partition string `json:"partition"` - QueueName string `json:"queueName"` - SubmissionTime int64 `json:"submissionTime"` - FinishedTime *int64 `json:"finishedTime"` - Allocations []AllocationDAOInfo `json:"allocations"` - State string `json:"applicationState"` - User string `json:"user"` - RejectedMessage string `json:"rejectedMessage"` + ApplicationID string `json:"applicationID"` + UsedResource string `json:"usedResource"` + MaxUsedResource string `json:"maxUsedResource"` + Partition string `json:"partition"` + QueueName string `json:"queueName"` + SubmissionTime int64 `json:"submissionTime"` + FinishedTime *int64 `json:"finishedTime"` + Allocations []AllocationDAOInfo `json:"allocations"` + State string `json:"applicationState"` + User string `json:"user"` + RejectedMessage string `json:"rejectedMessage"` + PlaceholderData []PlaceholderDAOInfo `json:"placeholderData"` } type AllocationDAOInfo struct { @@ -47,3 +50,11 @@ type AllocationDAOInfo struct { ApplicationID string `json:"applicationId"` Partition string `json:"partition"` } + +type PlaceholderDAOInfo struct { + TaskGroupName string `json:"taskGroupName"` + RequiredNode string `json:"requiredNode"` + AllocatedResource *resources.Resource `json:"allocatedResource"` + Count int64 `json:"count"` + Replaced int64 `json:"replaced"` +} diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go index 8bea2e7c1..66efcb4cb 100644 --- a/pkg/webservice/handlers.go +++ b/pkg/webservice/handlers.go @@ -277,6 +277,8 @@ func getPartitionJSON(partition *scheduler.PartitionContext) *dao.PartitionDAOIn func getApplicationJSON(app *objects.Application) *dao.ApplicationDAOInfo { allocations := app.GetAllAllocations() allocationInfos := make([]dao.AllocationDAOInfo, 0, len(allocations)) + placeHolderInfos := make([]dao.PlaceholderDAOInfo, 0, len(app.PlaceholderDatas)) + for _, alloc := range allocations { allocInfo := dao.AllocationDAOInfo{ AllocationKey: alloc.AllocationKey, @@ -290,6 +292,18 @@ func getApplicationJSON(app *objects.Application) *dao.ApplicationDAOInfo { Partition: alloc.PartitionName, } allocationInfos = append(allocationInfos, allocInfo) + + if alloc.IsPlaceholder() { + taskGroupName := alloc.GetTaskGroup() + placeHolderInfo := dao.PlaceholderDAOInfo{ + TaskGroupName: taskGroupName, + RequiredNode: app.PlaceholderDatas[taskGroupName].RequiredNode, + AllocatedResource: app.PlaceholderDatas[taskGroupName].AllocatedResource, + Count: app.PlaceholderDatas[taskGroupName].Count, + Replaced: app.PlaceholderDatas[taskGroupName].Replaced, + } + placeHolderInfos = append(placeHolderInfos, placeHolderInfo) + } } return &dao.ApplicationDAOInfo{ @@ -304,6 +318,7 @@ func getApplicationJSON(app *objects.Application) *dao.ApplicationDAOInfo { State: app.CurrentState(), User: app.GetUser().User, RejectedMessage: app.GetRejectedMessage(), + PlaceholderData: placeHolderInfos, } }