Skip to content

Commit

Permalink
[YUNIKORN-2925] Remove internal object from application REST info (#999)
Browse files Browse the repository at this point in the history
Remove the TrackedResource object from the REST response for the
application. Moved the tracked resources into a separate object inside
the app DAO, as we have done for other complex pieces.
Cleanup exposure of the summary object to be testing only.
Add DAO retrieval of TrackedResource type for the web handler.

Closes: #999

Signed-off-by: Craig Condit <[email protected]>
  • Loading branch information
wilfred-s authored and craigcondit committed Nov 22, 2024
1 parent ebb8b1c commit 7c99e6b
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 190 deletions.
48 changes: 30 additions & 18 deletions pkg/common/resources/tracked_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"strings"
"time"

"golang.org/x/exp/maps"

"github.com/apache/yunikorn-core/pkg/locking"
)

Expand All @@ -44,10 +46,6 @@ func NewTrackedResource() *TrackedResource {
// NewTrackedResourceFromMap creates NewTrackedResource from the given map.
// Using for Testing purpose only.
func NewTrackedResourceFromMap(m map[string]map[string]Quantity) *TrackedResource {
if m == nil {
return NewTrackedResource()
}

trackedMap := make(map[string]*Resource)
for inst, inner := range m {
trackedMap[inst] = NewResourceFromMap(inner)
Expand All @@ -56,6 +54,9 @@ func NewTrackedResourceFromMap(m map[string]map[string]Quantity) *TrackedResourc
}

func (tr *TrackedResource) String() string {
if tr == nil {
return "TrackedResource{}"
}
tr.RLock()
defer tr.RUnlock()

Expand Down Expand Up @@ -85,8 +86,7 @@ func (tr *TrackedResource) Clone() *TrackedResource {

// AggregateTrackedResource aggregates resource usage to TrackedResourceMap[instType].
// The time the given resource used is the delta between the resource createTime and currentTime.
func (tr *TrackedResource) AggregateTrackedResource(instType string,
resource *Resource, bindTime time.Time) {
func (tr *TrackedResource) AggregateTrackedResource(instType string, resource *Resource, bindTime time.Time) {
if resource == nil {
return
}
Expand All @@ -105,25 +105,37 @@ func (tr *TrackedResource) AggregateTrackedResource(instType string,
tr.TrackedResourceMap[instType] = aggregatedResourceTime
}

func EqualsTracked(left, right *TrackedResource) bool {
if left == right {
return true
// EqualsDAO compares the TrackedResource against the DAO map that was created of the resource.
// Test use only
func (tr *TrackedResource) EqualsDAO(right map[string]map[string]int64) bool {
if tr == nil {
return len(right) == 0
}

if left == nil || right == nil {
tr.RLock()
defer tr.RUnlock()
if len(tr.TrackedResourceMap) != len(right) {
return false
}

for k, v := range left.TrackedResourceMap {
inner, ok := right.TrackedResourceMap[k]
if !ok {
for k, v := range tr.TrackedResourceMap {
if inner, ok := right[k]; !ok {
return false
}

if !Equals(v, inner) {
} else if !maps.Equal(v.DAOMap(), inner) {
return false
}
}

return true
}

// DAOMap converts the TrackedResource into a map structure for use in the REST API.
func (tr *TrackedResource) DAOMap() map[string]map[string]int64 {
daoMAP := make(map[string]map[string]int64)
if tr != nil {
tr.RLock()
defer tr.RUnlock()
for k, res := range tr.TrackedResourceMap {
daoMAP[k] = res.DAOMap()
}
}
return daoMAP
}
121 changes: 60 additions & 61 deletions pkg/common/resources/tracked_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,73 +29,55 @@ import (
"gotest.tools/v3/assert"
)

func CheckLenOfTrackedResource(res *TrackedResource, expected int) (bool, string) {
if got := len(res.TrackedResourceMap); expected == 0 && (res == nil || got != expected) {
return false, fmt.Sprintf("input with empty and nil should be a empty tracked resource: Expected %d, got %d", expected, got)
func CheckTrackedResource(res *TrackedResource, trMap map[string]map[string]Quantity) error {
if len(res.TrackedResourceMap) != len(trMap) {
return fmt.Errorf("input with empty and nil should be a empty tracked resource: Expected %d, got %d", len(trMap), len(res.TrackedResourceMap))
}
if got := len(res.TrackedResourceMap); got != expected {
return false, fmt.Sprintf("Length of tracked resources is wrong: Expected %d, got %d", expected, got)
}
return true, ""
}

func CheckResourceValueOfTrackedResource(res *TrackedResource, expected map[string]map[string]Quantity) (bool, string) {
for instanceType, expected := range expected {
for instanceType, expect := range trMap {
trackedRes := res.TrackedResourceMap[instanceType]
expectedRes := NewResourceFromMap(expected)
expectedRes := NewResourceFromMap(expect)
if !Equals(trackedRes, expectedRes) {
return false, fmt.Sprintf("instance type %s, expected %s, got %s", instanceType, trackedRes, expectedRes)
return fmt.Errorf("instance type %s, expected %s, got %s", instanceType, trackedRes, expectedRes)
}
}
return true, ""
return nil
}

func TestNewTrackedResourceFromMap(t *testing.T) {
type outputs struct {
length int
trackedResources map[string]map[string]Quantity
}
var tests = []struct {
caseName string
input map[string]map[string]Quantity
expected outputs
trMap map[string]map[string]Quantity
}{
{
"nil",
nil,
outputs{0, map[string]map[string]Quantity{}},
map[string]map[string]Quantity{},
},
{
"empty",
map[string]map[string]Quantity{},
outputs{0, map[string]map[string]Quantity{}},
},
map[string]map[string]Quantity{}},
{
"tracked resources of one instance type",
map[string]map[string]Quantity{"instanceType1": {"first": 1}},
outputs{1, map[string]map[string]Quantity{"instanceType1": {"first": 1}}},
map[string]map[string]Quantity{"instanceType1": {"first": 1}},
},
{
"tracked resources of two instance type",
map[string]map[string]Quantity{"instanceType1": {"first": 0}, "instanceType2": {"second": -1}},
outputs{2, map[string]map[string]Quantity{"instanceType1": {"first": 0}, "instanceType2": {"second": -1}}},
map[string]map[string]Quantity{"instanceType1": {"first": 0}, "instanceType2": {"second": -1}},
},
{
"Multiple tracked resources for one instance type",
map[string]map[string]Quantity{"instanceType1": {"first": 1, "second": -2, "third": 3}},
outputs{1, map[string]map[string]Quantity{"instanceType1": {"first": 1, "second": -2, "third": 3}}},
map[string]map[string]Quantity{"instanceType1": {"first": 1, "second": -2, "third": 3}},
},
}
for _, tt := range tests {
t.Run(tt.caseName, func(t *testing.T) {
res := NewTrackedResourceFromMap(tt.input)
if ok, err := CheckLenOfTrackedResource(res, tt.expected.length); !ok {
t.Error(err)
} else {
if ok, err := CheckResourceValueOfTrackedResource(res, tt.expected.trackedResources); !ok {
t.Error(err)
}
}
assert.NilError(t, CheckTrackedResource(res, tt.trMap))
})
}
}
Expand Down Expand Up @@ -132,6 +114,11 @@ func TestTrackedResourceClone(t *testing.T) {
}

// case: tracked resource is nil
defer func() {
if r := recover(); r != nil {
t.Fatal("Clone panic on nil TrackedResource")
}
}()
tr := (*TrackedResource)(nil)
cloned := tr.Clone()
assert.Assert(t, cloned == nil)
Expand Down Expand Up @@ -220,58 +207,61 @@ func TestTrackedResourceAggregateTrackedResource(t *testing.T) {
}

// case: resource is nil
defer func() {
if r := recover(); r != nil {
t.Fatal("Panic on nil map for new TrackedResource")
}
}()
tr := NewTrackedResourceFromMap(nil)
tr.AggregateTrackedResource("instanceType1", nil, time.Now().Add(-time.Minute))
assert.Assert(t, tr.TrackedResourceMap != nil && len(tr.TrackedResourceMap) == 0)
}

func TestEqualsTracked(t *testing.T) {
type inputs struct {
base map[string]map[string]Quantity
compare map[string]map[string]Quantity
}
var tests = []struct {
caseName string
input inputs
base map[string]map[string]Quantity
compare map[string]map[string]int64
expected bool
}{
{"simple cases (nil checks)", inputs{nil, nil}, true},
{"simple cases (nil checks)", inputs{map[string]map[string]Quantity{}, nil}, false},
{"same first and second level keys and different resource value",
inputs{map[string]map[string]Quantity{"first": {"val": 10}}, map[string]map[string]Quantity{"first": {"val": 0}}},
{"nil inputs", nil, nil, true},
{"both empty", map[string]map[string]Quantity{}, map[string]map[string]int64{}, true},
{"empty tracked nil dao", map[string]map[string]Quantity{}, nil, true},
{"empty tracked",
map[string]map[string]Quantity{},
map[string]map[string]int64{"first": {"val": 0}},
false,
},
{"different first-level key, same second-level key, same resource value",
inputs{map[string]map[string]Quantity{"first": {"val": 10}}, map[string]map[string]Quantity{"second": {"val": 10}}},
false},
{"same first-level key, different second-level key, same resource value",
inputs{map[string]map[string]Quantity{"first": {"val": 10}}, map[string]map[string]Quantity{"first": {"value": 10}}},
{"same keys different values",
map[string]map[string]Quantity{"first": {"val": 10}},
map[string]map[string]int64{"first": {"val": 0}},
false,
},
{"different instance type",
map[string]map[string]Quantity{"first": {"val": 10}},
map[string]map[string]int64{"second": {"val": 10}},
false},
{"same first-level key, second has larger sub-level map",
inputs{map[string]map[string]Quantity{"first": {"val": 10}}, map[string]map[string]Quantity{"first": {"val": 10, "sum": 7}}},
{"different resource type",
map[string]map[string]Quantity{"first": {"val": 10}},
map[string]map[string]int64{"first": {"value": 10}},
false},
{"same first-level key, first has larger sub-level map",
inputs{map[string]map[string]Quantity{"first": {"val": 10, "sum": 7}}, map[string]map[string]Quantity{"first": {"val": 10}}},
{"different resource count",
map[string]map[string]Quantity{"first": {"val": 10}},
map[string]map[string]int64{"first": {"val": 10, "sum": 7}},
false},
{"same keys and values",
inputs{map[string]map[string]Quantity{"x": {"val": 10, "sum": 7}}, map[string]map[string]Quantity{"x": {"val": 10, "sum": 7}}},
map[string]map[string]Quantity{"x": {"val": 10, "sum": 7}},
map[string]map[string]int64{"x": {"val": 10, "sum": 7}},
true},
}
for _, tt := range tests {
t.Run(tt.caseName, func(t *testing.T) {
var base, compare *TrackedResource
if tt.input.base != nil {
base = NewTrackedResourceFromMap(tt.input.base)
if tt.base != nil {
base = NewTrackedResourceFromMap(tt.base)
}
if tt.input.compare != nil {
compare = NewTrackedResourceFromMap(tt.input.compare)
}

result := EqualsTracked(base, compare)
assert.Assert(t, result == tt.expected, "Equal result should be %v instead of %v, left %v, right %v", tt.expected, result, base, compare)

result = EqualsTracked(compare, base)
assert.Assert(t, result == tt.expected, "Equal result should be %v instead of %v, left %v, right %v", tt.expected, result, compare, base)
assert.Equal(t, base.EqualsDAO(tt.compare), tt.expected, "Equal result should be %v instead of %v, left %v, right %v", tt.expected, base, compare)
})
}
}
Expand Down Expand Up @@ -311,4 +301,13 @@ func TestTrackedResourceString(t *testing.T) {
})
expected = "TrackedResource{instanceType1:cpu=10,instanceType1:memory=20,instanceType2:memory=15}"
assert.Equal(t, sortTrackedResourceString(expected), sortTrackedResourceString(tr3.String()))

defer func() {
if r := recover(); r != nil {
t.Fatal("String panic on nil TrackedResource")
}
}()
tr := (*TrackedResource)(nil)
str := tr.String()
assert.Equal(t, str, "TrackedResource{}")
}
78 changes: 46 additions & 32 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,29 +124,6 @@ type Application struct {
locking.RWMutex
}

func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary {
sa.RLock()
defer sa.RUnlock()
state := sa.stateMachine.Current()
resourceUsage := sa.usedResource.Clone()
preemptedUsage := sa.preemptedResource.Clone()
placeHolderUsage := sa.placeholderResource.Clone()
appSummary := &ApplicationSummary{
ApplicationID: sa.ApplicationID,
SubmissionTime: sa.SubmissionTime,
StartTime: sa.startTime,
FinishTime: sa.finishedTime,
User: sa.user.User,
Queue: sa.queuePath,
State: state,
RmID: rmID,
ResourceUsage: resourceUsage,
PreemptedResource: preemptedUsage,
PlaceholderResource: placeHolderUsage,
}
return appSummary
}

func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eventHandler handler.EventHandler, rmID string) *Application {
app := &Application{
ApplicationID: siApp.ApplicationID,
Expand Down Expand Up @@ -2127,21 +2104,58 @@ func (sa *Application) cleanupTrackedResource() {
sa.preemptedResource = nil
}

func (sa *Application) CleanupTrackedResource() {
// GetApplicationSummary locked version to get the application summary
// Exposed for test only
func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary {
sa.RLock()
defer sa.RUnlock()
return sa.getApplicationSummary(rmID)
}

func (sa *Application) getApplicationSummary(rmID string) *ApplicationSummary {
return &ApplicationSummary{
ApplicationID: sa.ApplicationID,
SubmissionTime: sa.SubmissionTime,
StartTime: sa.startTime,
FinishTime: sa.finishedTime,
User: sa.user.User,
Queue: sa.queuePath,
State: sa.stateMachine.Current(),
RmID: rmID,
ResourceUsage: sa.usedResource.Clone(),
PreemptedResource: sa.preemptedResource.Clone(),
PlaceholderResource: sa.placeholderResource.Clone(),
}
}

// LogAppSummary log the summary details for the application if it has run at any point in time.
// The application summary only contains correct data when the application is in the Completed state.
// Logging the data in any other state will show incomplete or inconsistent data.
// After the data is logged the objects are cleaned up to lower overhead of Completed application tracking.
func (sa *Application) LogAppSummary(rmID string) {
sa.Lock()
defer sa.Unlock()
if !sa.startTime.IsZero() {
appSummary := sa.getApplicationSummary(rmID)
appSummary.DoLogging()
}
sa.cleanupTrackedResource()
}

func (sa *Application) LogAppSummary(rmID string) {
if sa.startTime.IsZero() {
return
// GetTrackedDAOMap returns the tracked resources type specified in which as a DAO similar to the normal resources.
func (sa *Application) GetTrackedDAOMap(which string) map[string]map[string]int64 {
sa.RLock()
defer sa.RUnlock()
switch which {
case "usedResource":
return sa.usedResource.DAOMap()
case "preemptedResource":
return sa.preemptedResource.DAOMap()
case "placeholderResource":
return sa.placeholderResource.DAOMap()
default:
return map[string]map[string]int64{}
}
appSummary := sa.GetApplicationSummary(rmID)
appSummary.DoLogging()
appSummary.ResourceUsage = nil
appSummary.PreemptedResource = nil
appSummary.PlaceholderResource = nil
}

func (sa *Application) HasPlaceholderAllocation() bool {
Expand Down
Loading

0 comments on commit 7c99e6b

Please sign in to comment.