From 79650f8eeb678250f60dfb4ddf252b85d02b1af1 Mon Sep 17 00:00:00 2001 From: Robert Smith Date: Tue, 10 Dec 2024 18:11:32 +0000 Subject: [PATCH 1/7] Scheduler: move nodedb tests to internaltypes Signed-off-by: Robert Smith --- .../scheduler/internaltypes/node_factory.go | 4 + .../internaltypes/resource_list_map_util.go | 25 ++ .../resource_list_map_util_test.go | 10 + internal/scheduler/nodedb/nodedb_test.go | 22 + .../scheduler/nodedb/nodeiteration_test.go | 398 ++++++++---------- .../scheduler/testfixtures/testfixtures.go | 139 ++++++ 6 files changed, 379 insertions(+), 219 deletions(-) diff --git a/internal/scheduler/internaltypes/node_factory.go b/internal/scheduler/internaltypes/node_factory.go index 22509f17ac3..5580b58fa13 100644 --- a/internal/scheduler/internaltypes/node_factory.go +++ b/internal/scheduler/internaltypes/node_factory.go @@ -104,3 +104,7 @@ func (f *NodeFactory) FromSchedulerObjectsExecutors(executors []*schedulerobject } return result } + +func (f *NodeFactory) ResourceListFactory() *ResourceListFactory { + return f.resourceListFactory +} diff --git a/internal/scheduler/internaltypes/resource_list_map_util.go b/internal/scheduler/internaltypes/resource_list_map_util.go index b359703034a..e8a3e16085f 100644 --- a/internal/scheduler/internaltypes/resource_list_map_util.go +++ b/internal/scheduler/internaltypes/resource_list_map_util.go @@ -57,3 +57,28 @@ func RlMapRemoveZeros(m map[string]ResourceList) map[string]ResourceList { } return result } + +func NewAllocatableByPriorityAndResourceType(priorities []int32, rl ResourceList) map[int32]ResourceList { + result := map[int32]ResourceList{} + for _, priority := range priorities { + result[priority] = rl + + } + return result +} + +// MarkAllocated indicates resources have been allocated to pods of priority p, +// hence reducing the resources allocatable to pods of priority p or lower. +func MarkAllocated(m map[int32]ResourceList, p int32, rs ResourceList) { + MarkAllocatable(m, p, rs.Negate()) +} + +// MarkAllocatable indicates resources have been released by pods of priority p, +// thus increasing the resources allocatable to pods of priority p or lower. +func MarkAllocatable(m map[int32]ResourceList, p int32, rs ResourceList) { + for priority, allocatableResourcesAtPriority := range m { + if priority <= p { + m[priority] = allocatableResourcesAtPriority.Add(rs) + } + } +} diff --git a/internal/scheduler/internaltypes/resource_list_map_util_test.go b/internal/scheduler/internaltypes/resource_list_map_util_test.go index f278eeefd72..046e9cdc235 100644 --- a/internal/scheduler/internaltypes/resource_list_map_util_test.go +++ b/internal/scheduler/internaltypes/resource_list_map_util_test.go @@ -78,6 +78,16 @@ func TestRlMapRemoveZeros(t *testing.T) { assert.Equal(t, expected, RlMapRemoveZeros(input)) } +func TestNewAllocatableByPriorityAndResourceType(t *testing.T) { + factory := testFactory() + rl := testResourceList(factory, "2", "2Ki") + + result := NewAllocatableByPriorityAndResourceType([]int32{1, 2}, rl) + assert.Equal(t, 2, len(result)) + assert.Equal(t, int64(2000), result[1].GetByNameZeroIfMissing("cpu")) + assert.Equal(t, int64(2000), result[2].GetByNameZeroIfMissing("cpu")) +} + func testMapAllPositive(factory *ResourceListFactory) map[string]ResourceList { return map[string]ResourceList{ "a": testResourceList(factory, "1", "1Ki"), diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index d72f8d5cbd5..ca60a4a4417 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -918,6 +918,28 @@ func newNodeDbWithNodes(nodes []*schedulerobjects.Node) (*NodeDb, error) { return nodeDb, nil } +func itNewNodeDbWithNodes(nodes []*internaltypes.Node) (*NodeDb, error) { + nodeDb, err := NewNodeDb( + testfixtures.TestPriorityClasses, + testfixtures.TestResources, + testfixtures.TestIndexedTaints, + testfixtures.TestIndexedNodeLabels, + testfixtures.TestWellKnownNodeTypes, + testfixtures.TestResourceListFactory, + ) + if err != nil { + return nil, err + } + txn := nodeDb.Txn(true) + for _, node := range nodes { + if err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node); err != nil { + return nil, err + } + } + txn.Commit() + return nodeDb, nil +} + func BenchmarkNodeDbStringFromPodRequirementsNotMetReason(b *testing.B) { nodeDb := &NodeDb{ podRequirementsNotMetReasonStringCache: make(map[uint64]string, 128), diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index 8ab626577dd..ec4db6e367f 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -21,25 +21,25 @@ import ( func TestNodesIterator(t *testing.T) { tests := map[string]struct { - Nodes []*schedulerobjects.Node + Nodes []*internaltypes.Node }{ "1 node": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), }, "0 nodes": { - Nodes: testfixtures.N32CpuNodes(0, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(0, testfixtures.TestPriorities), }, "3 nodes": { - Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { indexById := make(map[string]int) for i, node := range tc.Nodes { - indexById[node.Id] = i + indexById[node.GetId()] = i } - nodeDb, err := newNodeDbWithNodes(tc.Nodes) + nodeDb, err := itNewNodeDbWithNodes(tc.Nodes) if !assert.NoError(t, err) { return } @@ -49,10 +49,10 @@ func TestNodesIterator(t *testing.T) { } sortedNodes := slices.Clone(tc.Nodes) - slices.SortFunc(sortedNodes, func(a, b *schedulerobjects.Node) int { - if a.Id < b.Id { + slices.SortFunc(sortedNodes, func(a, b *internaltypes.Node) int { + if a.GetId() < b.GetId() { return -1 - } else if a.Id > b.Id { + } else if a.GetId() > b.GetId() { return 1 } else { return 0 @@ -60,7 +60,7 @@ func TestNodesIterator(t *testing.T) { }) expected := make([]int, len(sortedNodes)) for i, node := range sortedNodes { - expected[i] = indexById[node.Id] + expected[i] = indexById[node.GetId()] } actual := make([]int, 0) @@ -74,306 +74,237 @@ func TestNodesIterator(t *testing.T) { } func TestNodeTypeIterator(t *testing.T) { - const nodeTypeALabel = "a" - const nodeTypeBLabel = "b" - - nodeTypeAId := nodeTypeLabelToNodeTypeId(nodeTypeALabel) - gpuNodeTypeAId := gpuNodeTypeLabelToNodeTypeId(nodeTypeALabel) + nodeTypeA := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "a"}) + nodeTypeB := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "b"}) tests := map[string]struct { - nodes []*schedulerobjects.Node + nodes []*internaltypes.Node nodeTypeId uint64 priority int32 - resourceRequests schedulerobjects.ResourceList + resourceRequests internaltypes.ResourceList expected []int }{ "only yield nodes of the right nodeType": { nodes: armadaslices.Concatenate( - withNodeTypeNodes( - nodeTypeALabel, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeA, + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - withNodeTypeNodes( - nodeTypeBLabel, - testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeB, + testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), ), - withNodeTypeNodes( - nodeTypeALabel, - testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeA, + testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), ), ), - nodeTypeId: nodeTypeAId, + nodeTypeId: nodeTypeA.GetId(), priority: 0, - resourceRequests: schedulerobjects.ResourceList{}, + resourceRequests: testfixtures.TestResourceListFactory.MakeAllZero(), expected: armadaslices.Concatenate( testfixtures.IntRange(0, 0), testfixtures.IntRange(3, 5), ), }, "filter nodes with insufficient resources and return in increasing order": { - nodes: withNodeTypeNodes( - nodeTypeALabel, + nodes: testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeId: nodeTypeAId, + nodeTypeId: nodeTypeA.GetId(), priority: 0, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, + resourceRequests: cpu("16"), expected: []int{1, 0}, }, "filter nodes with insufficient resources at priority and return in increasing order": { - nodes: withNodeTypeNodes( - nodeTypeALabel, + nodes: testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 1, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 1, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 1, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 2, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 2, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 2, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeId: nodeTypeAId, + nodeTypeId: nodeTypeA.GetId(), priority: 1, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, + resourceRequests: cpu("16"), expected: []int{4, 7, 3, 6, 0, 1, 2}, }, "nested ordering": { - nodes: withNodeTypeNodes( - nodeTypeALabel, + nodes: testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("1Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "1Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("2Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "2Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("129Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "129Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("130Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "130Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("131Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "131Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("130Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("16", "130Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("128Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("16", "128Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("129Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("16", "129Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("17"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeId: nodeTypeAId, - priority: 0, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("128Gi"), - }}, - expected: []int{6, 1, 0}, + nodeTypeId: nodeTypeA.GetId(), + priority: 0, + resourceRequests: cpuMem("16", "128Gi"), + expected: []int{6, 1, 0}, }, "double-nested ordering": { - nodes: withNodeTypeNodes( - nodeTypeALabel, + nodes: testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("31", "1Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - "nvidia.com/gpu": resource.MustParse("1"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "1Gi", "1"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - "nvidia.com/gpu": resource.MustParse("2"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "1Gi", "2"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - "nvidia.com/gpu": resource.MustParse("5"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "1Gi", "5"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("2Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("31", "2Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("2Gi"), - "nvidia.com/gpu": resource.MustParse("1"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "2Gi", "1"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("514Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("32", "514Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("512Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("32", "512Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("513Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("32", "513Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("33"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpu("33"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeId: gpuNodeTypeAId, - priority: 0, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("512Gi"), - "nvidia.com/gpu": resource.MustParse("4"), - }}, - expected: []int{7, 5, 4, 2, 1, 0}, + nodeTypeId: nodeTypeA.GetId(), + priority: 0, + resourceRequests: cpuMemGpu("32", "512Gi", "4"), + expected: []int{7, 5, 4, 2, 1, 0}, }, } for name, tc := range tests { @@ -384,27 +315,19 @@ func TestNodeTypeIterator(t *testing.T) { entries := make([]*internaltypes.Node, len(tc.nodes)) for i, node := range tc.nodes { // Set monotonically increasing node IDs to ensure nodes appear in predictable order. - node.Id = fmt.Sprintf("%d", i) - - entry, err := internaltypes.FromSchedulerObjectsNode(node, - uint64(i), - nodeDb.indexedTaints, - nodeDb.indexedNodeLabels, - nodeDb.resourceListFactory) - - require.NoError(t, err) + newNodeId := fmt.Sprintf("%d", i) + entry := testfixtures.ItWithIdNodes(newNodeId, []*internaltypes.Node{node})[0] nodeDb.AddNodeToDb(entry) - entries[i] = entry } require.NoError(t, nodeDb.UpsertMany(entries)) indexedResourceRequests := make([]int64, len(testfixtures.TestResources)) - rr, err := testfixtures.TestResourceListFactory.FromJobResourceListFailOnUnknown(tc.resourceRequests.Resources) + assert.Nil(t, err) for i, resourceName := range nodeDb.indexedResources { - indexedResourceRequests[i], err = rr.GetByName(resourceName) + indexedResourceRequests[i], err = tc.resourceRequests.GetByName(resourceName) assert.Nil(t, err) } keyIndex := -1 @@ -932,3 +855,40 @@ func labelsToNodeTypeId(labels map[string]string) uint64 { ) return nodeType.GetId() } + +func labelsToNodeType(labels map[string]string) *internaltypes.NodeType { + nodeType := internaltypes.NewNodeType( + []v1.Taint{}, + labels, + util.StringListToSet(testfixtures.TestIndexedTaints), + util.StringListToSet(testfixtures.TestIndexedNodeLabels), + ) + return nodeType +} + +func cpu(cpu string) internaltypes.ResourceList { + return testfixtures.TestResourceListFactory.FromNodeProto( + map[string]resource.Quantity{ + "cpu": resource.MustParse(cpu), + }, + ) +} + +func cpuMem(cpu string, memory string) internaltypes.ResourceList { + return testfixtures.TestResourceListFactory.FromNodeProto( + map[string]resource.Quantity{ + "cpu": resource.MustParse(cpu), + "memory": resource.MustParse(memory), + }, + ) +} + +func cpuMemGpu(cpu string, memory string, gpu string) internaltypes.ResourceList { + return testfixtures.TestResourceListFactory.FromNodeProto( + map[string]resource.Quantity{ + "cpu": resource.MustParse(cpu), + "memory": resource.MustParse(memory), + "nvidia.com/gpu": resource.MustParse(gpu), + }, + ) +} diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 4a44cf346a0..38c4472aa0a 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -297,6 +297,13 @@ func WithUsedResourcesNodes(p int32, rl schedulerobjects.ResourceList, nodes []* return nodes } +func ItWithUsedResourcesNodes(p int32, rl internaltypes.ResourceList, nodes []*internaltypes.Node) []*internaltypes.Node { + for _, node := range nodes { + internaltypes.MarkAllocated(node.AllocatableByPriority, p, rl) + } + return nodes +} + func WithLabelsNodes(labels map[string]string, nodes []*schedulerobjects.Node) []*schedulerobjects.Node { for _, node := range nodes { if node.Labels == nil { @@ -308,6 +315,78 @@ func WithLabelsNodes(labels map[string]string, nodes []*schedulerobjects.Node) [ return nodes } +func ItWithLabelsNodes(additionalLabels map[string]string, nodes []*internaltypes.Node) []*internaltypes.Node { + result := make([]*internaltypes.Node, len(nodes)) + for i, node := range nodes { + labels := node.GetLabels() + maps.Copy(labels, additionalLabels) + result[i] = internaltypes.CreateNode(node.GetId(), + node.GetNodeType(), + node.GetIndex(), + node.GetExecutor(), + node.GetName(), + node.GetPool(), + node.GetTaints(), + labels, + node.GetTotalResources(), + node.GetUnallocatableResources(), + node.AllocatableByPriority, + node.AllocatedByQueue, + node.AllocatedByJobId, + node.EvictedJobRunIds, + nil) + + } + return result +} + +func ItWithNodeTypeNodes(nodeType *internaltypes.NodeType, nodes []*internaltypes.Node) []*internaltypes.Node { + result := make([]*internaltypes.Node, len(nodes)) + for i, node := range nodes { + result[i] = internaltypes.CreateNode(node.GetId(), + nodeType, + node.GetIndex(), + node.GetExecutor(), + node.GetName(), + node.GetPool(), + node.GetTaints(), + node.GetLabels(), + node.GetTotalResources(), + node.GetUnallocatableResources(), + node.AllocatableByPriority, + node.AllocatedByQueue, + node.AllocatedByJobId, + node.EvictedJobRunIds, + nil) + + } + return result +} + +func ItWithIdNodes(nodeId string, nodes []*internaltypes.Node) []*internaltypes.Node { + result := make([]*internaltypes.Node, len(nodes)) + for i, node := range nodes { + result[i] = internaltypes.CreateNode(nodeId, + node.GetNodeType(), + node.GetIndex(), + node.GetExecutor(), + node.GetName(), + node.GetPool(), + node.GetTaints(), + node.GetLabels(), + node.GetTotalResources(), + node.GetUnallocatableResources(), + node.AllocatableByPriority, + node.AllocatedByQueue, + node.AllocatedByJobId, + node.EvictedJobRunIds, + nil, + ) + + } + return result +} + func WithPriorityJobs(priority uint32, jobs []*jobdb.Job) []*jobdb.Job { for i, job := range jobs { jobs[i] = job.WithPriority(priority) @@ -714,6 +793,14 @@ func N32CpuNodes(n int, priorities []int32) []*schedulerobjects.Node { return rv } +func ItN32CpuNodes(n int, priorities []int32) []*internaltypes.Node { + rv := make([]*internaltypes.Node, n) + for i := 0; i < n; i++ { + rv[i] = ItTest32CpuNode(priorities) + } + return rv +} + func NTainted32CpuNodes(n int, priorities []int32) []*schedulerobjects.Node { rv := make([]*schedulerobjects.Node, n) for i := 0; i < n; i++ { @@ -730,6 +817,14 @@ func N8GpuNodes(n int, priorities []int32) []*schedulerobjects.Node { return rv } +func ItN8GpuNodes(n int, priorities []int32) []*internaltypes.Node { + rv := make([]*internaltypes.Node, n) + for i := 0; i < n; i++ { + rv[i] = ItTest8GpuNode(priorities) + } + return rv +} + func SingleQueuePriorityOne(name string) []*api.Queue { return []*api.Queue{{Name: name, PriorityFactor: 1.0}} } @@ -754,6 +849,25 @@ func TestNode(priorities []int32, resources map[string]resource.Quantity) *sched } } +func ItTestNode(priorities []int32, resources map[string]resource.Quantity) *internaltypes.Node { + rl := TestNodeFactory.ResourceListFactory().FromNodeProto(resources) + id := uuid.NewString() + return TestNodeFactory.CreateNodeAndType(id, + "executor1", + id, + TestPool, + false, + []v1.Taint{}, + map[string]string{ + TestHostnameLabel: id, + schedulerconfiguration.NodeIdLabel: id, + }, + rl, + map[int32]internaltypes.ResourceList{}, + internaltypes.NewAllocatableByPriorityAndResourceType(priorities, rl)) + +} + func Test32CpuNode(priorities []int32) *schedulerobjects.Node { return TestNode( priorities, @@ -764,6 +878,16 @@ func Test32CpuNode(priorities []int32) *schedulerobjects.Node { ) } +func ItTest32CpuNode(priorities []int32) *internaltypes.Node { + return ItTestNode( + priorities, + map[string]resource.Quantity{ + "cpu": resource.MustParse("32"), + "memory": resource.MustParse("256Gi"), + }, + ) +} + func TestTainted32CpuNode(priorities []int32) *schedulerobjects.Node { node := Test32CpuNode(priorities) node.Taints = []v1.Taint{ @@ -790,6 +914,21 @@ func Test8GpuNode(priorities []int32) *schedulerobjects.Node { return node } +func ItTest8GpuNode(priorities []int32) *internaltypes.Node { + node := ItTestNode( + priorities, + map[string]resource.Quantity{ + "cpu": resource.MustParse("64"), + "memory": resource.MustParse("1024Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + }, + ) + return ItWithLabelsNodes( + map[string]string{"gpu": "true"}, + []*internaltypes.Node{node}, + )[0] +} + func WithLastUpdateTimeExecutor(lastUpdateTime time.Time, executor *schedulerobjects.Executor) *schedulerobjects.Executor { executor.LastUpdateTime = lastUpdateTime return executor From 82c97fb7a1c69919889ebf06e8a659eb911a4425 Mon Sep 17 00:00:00 2001 From: Robert Smith Date: Mon, 30 Dec 2024 12:02:04 +0000 Subject: [PATCH 2/7] progress Signed-off-by: Robert Smith --- .../scheduler/nodedb/nodeiteration_test.go | 369 +++++++----------- .../scheduler/testfixtures/testfixtures.go | 19 + 2 files changed, 166 insertions(+), 222 deletions(-) diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index ec4db6e367f..18f484bf80e 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -380,39 +380,36 @@ func TestNodeTypesIterator(t *testing.T) { const nodeTypeCLabel = "c" const nodeTypeDLabel = "d" - nodeTypeAId := nodeTypeLabelToNodeTypeId(nodeTypeALabel) - nodeTypeBId := nodeTypeLabelToNodeTypeId(nodeTypeBLabel) - nodeTypeCId := nodeTypeLabelToNodeTypeId(nodeTypeCLabel) - - gpuNodeTypeAId := gpuNodeTypeLabelToNodeTypeId(nodeTypeALabel) - gpuNodeTypeBId := gpuNodeTypeLabelToNodeTypeId(nodeTypeBLabel) - gpuNodeTypeCId := gpuNodeTypeLabelToNodeTypeId(nodeTypeCLabel) + nodeTypeA := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "a"}) + nodeTypeB := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "b"}) + nodeTypeC := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "c"}) + nodeTypeD := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "d"}) tests := map[string]struct { - nodes []*schedulerobjects.Node + nodes []*internaltypes.Node nodeTypeIds []uint64 priority int32 - resourceRequests schedulerobjects.ResourceList + resourceRequests internaltypes.ResourceList expected []int }{ "only yield nodes of the right nodeType": { nodes: armadaslices.Concatenate( - withNodeTypeNodes( - nodeTypeALabel, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeA, + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - withNodeTypeNodes( - nodeTypeBLabel, - testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeB, + testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), ), - withNodeTypeNodes( - nodeTypeCLabel, - testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeC, + testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), ), ), - nodeTypeIds: []uint64{nodeTypeAId, nodeTypeCId}, + nodeTypeIds: []uint64{nodeTypeA.GetId(), nodeTypeC.GetId()}, priority: 0, - resourceRequests: schedulerobjects.ResourceList{}, + resourceRequests: testfixtures.TestResourceListFactory.MakeAllZero(), expected: armadaslices.Concatenate( testfixtures.IntRange(0, 0), testfixtures.IntRange(3, 5), @@ -420,294 +417,227 @@ func TestNodeTypesIterator(t *testing.T) { }, "filter nodes with insufficient resources and return in increasing order": { nodes: armadaslices.Concatenate( - withNodeTypeNodes( - nodeTypeALabel, - testfixtures.WithUsedResourcesNodes( - 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeA, + testfixtures.ItWithUsedResourcesNodes(0, + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), - withNodeTypeNodes( - nodeTypeBLabel, - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithNodeTypeNodes( + nodeTypeB, + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), - withNodeTypeNodes( - nodeTypeCLabel, - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithNodeTypeNodes( + nodeTypeC, + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), - withNodeTypeNodes( - nodeTypeDLabel, - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithNodeTypeNodes( + nodeTypeD, + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("14")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("14"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeIds: []uint64{nodeTypeAId, nodeTypeBId, nodeTypeCId}, + nodeTypeIds: []uint64{nodeTypeA.GetId(), nodeTypeB.GetId(), nodeTypeC.GetId()}, priority: 0, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, + resourceRequests: cpu("16"), expected: []int{1, 0}, }, "filter nodes with insufficient resources at priority and return in increasing order": { - nodes: withNodeTypeNodes( - nodeTypeALabel, + nodes: testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 1, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 1, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 1, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 2, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 2, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 2, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeIds: []uint64{nodeTypeAId}, + nodeTypeIds: []uint64{nodeTypeA.GetId()}, priority: 1, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, + resourceRequests: cpu("16"), expected: []int{4, 7, 3, 6, 0, 1, 2}, }, "nested ordering": { - nodes: withNodeTypeNodes( - nodeTypeALabel, + nodes: testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("1Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "1Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("2Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "2Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("129Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "129Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("130Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "130Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("131Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "131Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("130Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("16", "130Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("128Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("16", "128Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("129Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("16", "129Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("17"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeIds: []uint64{nodeTypeAId}, - priority: 0, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("128Gi"), - }}, - expected: []int{6, 1, 0}, + nodeTypeIds: []uint64{nodeTypeA.GetId()}, + priority: 0, + resourceRequests: cpuMem("16", "128Gi"), + expected: []int{6, 1, 0}, }, "double-nested ordering": { nodes: armadaslices.Concatenate( - withNodeTypeNodes( - nodeTypeALabel, + testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("31", "1Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - "nvidia.com/gpu": resource.MustParse("1"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "1Gi", "1"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - "nvidia.com/gpu": resource.MustParse("2"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "1Gi", "2"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - "nvidia.com/gpu": resource.MustParse("5"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "1Gi", "5"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), ), - withNodeTypeNodes( - nodeTypeBLabel, + testfixtures.ItWithNodeTypeNodes( + nodeTypeB, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("2Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("31", "2Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("2Gi"), - "nvidia.com/gpu": resource.MustParse("1"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "2Gi", "1"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("514Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("32", "514Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("512Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("32", "512Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), ), - withNodeTypeNodes( - nodeTypeCLabel, + testfixtures.ItWithNodeTypeNodes( + nodeTypeC, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("513Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("32", "513Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("33"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpu("33"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), ), ), - nodeTypeIds: []uint64{gpuNodeTypeAId, gpuNodeTypeBId, gpuNodeTypeCId}, - priority: 0, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("512Gi"), - "nvidia.com/gpu": resource.MustParse("4"), - }}, - expected: []int{7, 5, 4, 2, 1, 0}, + nodeTypeIds: []uint64{nodeTypeA.GetId(), nodeTypeB.GetId(), nodeTypeC.GetId()}, + priority: 0, + resourceRequests: cpuMemGpu("32", "512Gi", "4"), + expected: []int{7, 5, 4, 2, 1, 0}, }, } for name, tc := range tests { @@ -718,13 +648,9 @@ func TestNodeTypesIterator(t *testing.T) { entries := make([]*internaltypes.Node, len(tc.nodes)) for i, node := range tc.nodes { // Set monotonically increasing node IDs to ensure nodes appear in predictable order. - node.Id = fmt.Sprintf("%d", i) - - entry, err := internaltypes.FromSchedulerObjectsNode(node, - uint64(i), - nodeDb.indexedTaints, - nodeDb.indexedNodeLabels, - nodeDb.resourceListFactory) + nodeId := fmt.Sprintf("%d", i) + entry := testfixtures.ItWithIdNodes(nodeId, []*internaltypes.Node{node})[0] + entry = testfixtures.ItWithIndexNode(uint64(i), entry) require.NoError(t, err) @@ -734,8 +660,7 @@ func TestNodeTypesIterator(t *testing.T) { } require.NoError(t, nodeDb.UpsertMany(entries)) - rr, err := testfixtures.TestResourceListFactory.FromJobResourceListFailOnUnknown(tc.resourceRequests.Resources) - assert.Nil(t, err) + rr := tc.resourceRequests indexedResourceRequests := make([]int64, len(testfixtures.TestResources)) for i, resourceName := range testfixtures.TestResourceNames { diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 38c4472aa0a..c37fc2d7f5f 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -387,6 +387,25 @@ func ItWithIdNodes(nodeId string, nodes []*internaltypes.Node) []*internaltypes. return result } +func ItWithIndexNode(idx uint64, node *internaltypes.Node) *internaltypes.Node { + return internaltypes.CreateNode(node.GetId(), + node.GetNodeType(), + idx, + node.GetExecutor(), + node.GetName(), + node.GetPool(), + node.GetTaints(), + node.GetLabels(), + node.GetTotalResources(), + node.GetUnallocatableResources(), + node.AllocatableByPriority, + node.AllocatedByQueue, + node.AllocatedByJobId, + node.EvictedJobRunIds, + nil, + ) +} + func WithPriorityJobs(priority uint32, jobs []*jobdb.Job) []*jobdb.Job { for i, job := range jobs { jobs[i] = job.WithPriority(priority) From 417ccd31a1a65111cc03b9160d47a34e423bdf12 Mon Sep 17 00:00:00 2001 From: Robert Smith Date: Mon, 30 Dec 2024 15:28:51 +0000 Subject: [PATCH 3/7] progress Signed-off-by: Robert Smith --- .../scheduler/internaltypes/node_factory.go | 54 +++++++- internal/scheduler/nodedb/nodedb_test.go | 131 ++++++++---------- .../scheduler/nodedb/nodeiteration_test.go | 8 +- .../scheduler/testfixtures/testfixtures.go | 57 ++++---- 4 files changed, 144 insertions(+), 106 deletions(-) diff --git a/internal/scheduler/internaltypes/node_factory.go b/internal/scheduler/internaltypes/node_factory.go index 5580b58fa13..5084601195c 100644 --- a/internal/scheduler/internaltypes/node_factory.go +++ b/internal/scheduler/internaltypes/node_factory.go @@ -2,6 +2,7 @@ package internaltypes import ( "fmt" + "sync/atomic" v1 "k8s.io/api/core/v1" @@ -58,10 +59,9 @@ func (f *NodeFactory) CreateNodeAndType( unallocatableResources map[int32]ResourceList, allocatableByPriority map[int32]ResourceList, ) *Node { - f.nodeIndexCounter++ return CreateNodeAndType( id, - f.nodeIndexCounter, + f.allocateNodeIndex(), executor, name, pool, @@ -77,9 +77,8 @@ func (f *NodeFactory) CreateNodeAndType( } func (f *NodeFactory) FromSchedulerObjectsNode(node *schedulerobjects.Node) (*Node, error) { - f.nodeIndexCounter++ return FromSchedulerObjectsNode(node, - f.nodeIndexCounter, + f.allocateNodeIndex(), f.indexedTaints, f.indexedNodeLabels, f.resourceListFactory, @@ -108,3 +107,50 @@ func (f *NodeFactory) FromSchedulerObjectsExecutors(executors []*schedulerobject func (f *NodeFactory) ResourceListFactory() *ResourceListFactory { return f.resourceListFactory } + +func (f *NodeFactory) AddLabels(nodes []*Node, extraLabels map[string]string) []*Node { + result := make([]*Node, len(nodes)) + for i, node := range nodes { + newLabels := util.MergeMaps(node.GetLabels(), extraLabels) + result[i] = CreateNodeAndType(node.GetId(), + node.GetIndex(), + node.GetExecutor(), + node.GetName(), + node.GetPool(), + false, + node.GetTaints(), + newLabels, + f.indexedTaints, + f.indexedNodeLabels, + node.totalResources, + node.unallocatableResources, + node.AllocatableByPriority, + ) + } + return result +} + +func (f *NodeFactory) AddTaints(nodes []*Node, extraTaints []v1.Taint) []*Node { + result := make([]*Node, len(nodes)) + for i, node := range nodes { + result[i] = CreateNodeAndType(node.GetId(), + node.GetIndex(), + node.GetExecutor(), + node.GetName(), + node.GetPool(), + false, + append(node.GetTaints(), extraTaints...), + node.GetLabels(), + f.indexedTaints, + f.indexedNodeLabels, + node.totalResources, + node.unallocatableResources, + node.AllocatableByPriority, + ) + } + return result +} + +func (f *NodeFactory) allocateNodeIndex() uint64 { + return atomic.AddUint64(&f.nodeIndexCounter, 1) +} diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index ca60a4a4417..fe9b737a660 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -28,7 +28,7 @@ func TestNodeDbSchema(t *testing.T) { // Test the accounting of total resources across all nodes. func TestTotalResources(t *testing.T) { - nodeDb, err := newNodeDbWithNodes([]*schedulerobjects.Node{}) + nodeDb, err := newNodeDbWithNodes([]*internaltypes.Node{}) require.NoError(t, err) assert.False(t, nodeDb.TotalKubernetesResources().IsEmpty()) @@ -73,8 +73,8 @@ func TestTotalResources(t *testing.T) { } func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) { - nodes := testfixtures.N32CpuNodes(2, testfixtures.TestPriorities) - nodeId := nodes[1].Id + nodes := testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities) + nodeId := nodes[1].GetId() require.NotEmpty(t, nodeId) db, err := newNodeDbWithNodes(nodes) require.NoError(t, err) @@ -98,8 +98,8 @@ func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) { } func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) { - nodes := testfixtures.N32CpuNodes(1, testfixtures.TestPriorities) - nodeId := nodes[0].Id + nodes := testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities) + nodeId := nodes[0].GetId() require.NotEmpty(t, nodeId) db, err := newNodeDbWithNodes(nodes) require.NoError(t, err) @@ -123,10 +123,10 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) { } func TestNodeBindingEvictionUnbinding(t *testing.T) { - node := testfixtures.Test8GpuNode(testfixtures.TestPriorities) - nodeDb, err := newNodeDbWithNodes([]*schedulerobjects.Node{node}) + node := testfixtures.ItTest8GpuNode(testfixtures.TestPriorities) + nodeDb, err := newNodeDbWithNodes([]*internaltypes.Node{node}) require.NoError(t, err) - entry, err := nodeDb.GetNode(node.Id) + entry, err := nodeDb.GetNode(node.GetId()) require.NoError(t, err) jobFilter := func(job *jobdb.Job) bool { return true } @@ -280,7 +280,7 @@ func TestEviction(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - nodeDb, err := newNodeDbWithNodes([]*schedulerobjects.Node{}) + nodeDb, err := newNodeDbWithNodes([]*internaltypes.Node{}) require.NoError(t, err) txn := nodeDb.Txn(true) jobs := []*jobdb.Job{ @@ -313,27 +313,27 @@ func TestEviction(t *testing.T) { func TestScheduleIndividually(t *testing.T) { tests := map[string]struct { - Nodes []*schedulerobjects.Node + Nodes []*internaltypes.Node Jobs []*jobdb.Job ExpectSuccess []bool }{ "all jobs fit": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32), ExpectSuccess: testfixtures.Repeat(true, 32), }, "not all jobs fit": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 33), ExpectSuccess: append(testfixtures.Repeat(true, 32), testfixtures.Repeat(false, 1)...), }, "unavailable resource": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.N1GpuJobs("A", testfixtures.PriorityClass0, 1), ExpectSuccess: testfixtures.Repeat(false, 1), }, "unsupported resource": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.WithRequestsJobs( schedulerobjects.ResourceList{ Resources: map[string]resource.Quantity{ @@ -345,7 +345,7 @@ func TestScheduleIndividually(t *testing.T) { ExpectSuccess: testfixtures.Repeat(true, 1), // we ignore unknown resource types on jobs, should never happen in practice anyway as these should fail earlier. }, "preemption": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: append( append( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32), @@ -356,7 +356,7 @@ func TestScheduleIndividually(t *testing.T) { ExpectSuccess: append(testfixtures.Repeat(true, 64), testfixtures.Repeat(false, 32)...), }, "taints/tolerations": { - Nodes: testfixtures.NTainted32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItNTainted32CpuNodes(1, testfixtures.TestPriorities), Jobs: append( append( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), @@ -367,14 +367,11 @@ func TestScheduleIndividually(t *testing.T) { ExpectSuccess: []bool{false, false, true}, }, "node selector": { - Nodes: append( - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - testfixtures.WithLabelsNodes( - map[string]string{ - "key": "value", - }, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - )..., + Nodes: testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), + map[string]string{ + "key": "value", + }, ), Jobs: testfixtures.WithNodeSelectorJobs( map[string]string{ @@ -385,11 +382,11 @@ func TestScheduleIndividually(t *testing.T) { ExpectSuccess: append(testfixtures.Repeat(true, 32), testfixtures.Repeat(false, 1)...), }, "node selector with mismatched value": { - Nodes: testfixtures.WithLabelsNodes( + Nodes: testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{ "key": "value", }, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), ), Jobs: testfixtures.WithNodeSelectorJobs( map[string]string{ @@ -400,7 +397,7 @@ func TestScheduleIndividually(t *testing.T) { ExpectSuccess: testfixtures.Repeat(false, 1), }, "node selector with missing label": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.WithNodeSelectorJobs( map[string]string{ "this label does not exist": "value", @@ -411,12 +408,12 @@ func TestScheduleIndividually(t *testing.T) { }, "node affinity": { Nodes: append( - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - testfixtures.WithLabelsNodes( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{ "key": "value", }, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), )..., ), Jobs: testfixtures.WithNodeAffinityJobs( @@ -485,7 +482,7 @@ func TestScheduleMany(t *testing.T) { tests := map[string]struct { // Nodes to schedule across. - Nodes []*schedulerobjects.Node + Nodes []*internaltypes.Node // Schedule one group of jobs at a time. // Each group is composed of a slice of pods. Jobs [][]*jobdb.Job @@ -494,18 +491,18 @@ func TestScheduleMany(t *testing.T) { }{ // Attempts to schedule 32. All jobs get scheduled. "simple success": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: [][]*jobdb.Job{gangSuccess}, ExpectSuccess: []bool{true}, }, // Attempts to schedule 33 jobs. The overall result fails. "simple failure with min cardinality": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: [][]*jobdb.Job{gangFailure}, ExpectSuccess: []bool{false}, }, "correct rollback": { - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Jobs: [][]*jobdb.Job{ gangSuccess, gangFailure, @@ -514,7 +511,7 @@ func TestScheduleMany(t *testing.T) { ExpectSuccess: []bool{true, false, true}, }, "varying job size": { - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Jobs: [][]*jobdb.Job{ append( testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1), @@ -724,7 +721,7 @@ func TestMakeIndexedResourceResolution_ErrorsOnInvalidResolution(t *testing.T) { assert.Nil(t, result) } -func benchmarkUpsert(nodes []*schedulerobjects.Node, b *testing.B) { +func benchmarkUpsert(nodes []*internaltypes.Node, b *testing.B) { nodeDb, err := NewNodeDb( testfixtures.TestPriorityClasses, testfixtures.TestResources, @@ -737,11 +734,9 @@ func benchmarkUpsert(nodes []*schedulerobjects.Node, b *testing.B) { txn := nodeDb.Txn(true) entries := make([]*internaltypes.Node, len(nodes)) for i, node := range nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) require.NoError(b, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode) - require.NoError(b, err) - entry, err := nodeDb.GetNode(node.Id) + entry, err := nodeDb.GetNode(node.GetId()) require.NoError(b, err) entries[i] = entry } @@ -754,18 +749,18 @@ func benchmarkUpsert(nodes []*schedulerobjects.Node, b *testing.B) { } func BenchmarkUpsert1(b *testing.B) { - benchmarkUpsert(testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), b) + benchmarkUpsert(testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), b) } func BenchmarkUpsert1000(b *testing.B) { - benchmarkUpsert(testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), b) + benchmarkUpsert(testfixtures.ItN32CpuNodes(1000, testfixtures.TestPriorities), b) } func BenchmarkUpsert100000(b *testing.B) { - benchmarkUpsert(testfixtures.N32CpuNodes(100000, testfixtures.TestPriorities), b) + benchmarkUpsert(testfixtures.ItN32CpuNodes(100000, testfixtures.TestPriorities), b) } -func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs []*jobdb.Job) { +func benchmarkScheduleMany(b *testing.B, nodes []*internaltypes.Node, jobs []*jobdb.Job) { nodeDb, err := NewNodeDb( testfixtures.TestPriorityClasses, testfixtures.TestResources, @@ -777,9 +772,7 @@ func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs [] require.NoError(b, err) txn := nodeDb.Txn(true) for _, node := range nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(b, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) require.NoError(b, err) } txn.Commit() @@ -798,7 +791,7 @@ func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs [] func BenchmarkScheduleMany10CpuNodes320SmallJobs(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 320), ) } @@ -806,7 +799,7 @@ func BenchmarkScheduleMany10CpuNodes320SmallJobs(b *testing.B) { func BenchmarkScheduleMany10CpuNodes640SmallJobs(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 640), ) } @@ -814,7 +807,7 @@ func BenchmarkScheduleMany10CpuNodes640SmallJobs(b *testing.B) { func BenchmarkScheduleMany100CpuNodes3200SmallJobs(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(100, testfixtures.TestPriorities), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 3200), ) } @@ -822,7 +815,7 @@ func BenchmarkScheduleMany100CpuNodes3200SmallJobs(b *testing.B) { func BenchmarkScheduleMany100CpuNodes6400SmallJobs(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(100, testfixtures.TestPriorities), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 6400), ) } @@ -830,7 +823,7 @@ func BenchmarkScheduleMany100CpuNodes6400SmallJobs(b *testing.B) { func BenchmarkScheduleMany1000CpuNodes32000SmallJobs(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(1000, testfixtures.TestPriorities), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32000), ) } @@ -838,7 +831,7 @@ func BenchmarkScheduleMany1000CpuNodes32000SmallJobs(b *testing.B) { func BenchmarkScheduleMany1000CpuNodes64000SmallJobs(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(1000, testfixtures.TestPriorities), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 64000), ) } @@ -846,10 +839,10 @@ func BenchmarkScheduleMany1000CpuNodes64000SmallJobs(b *testing.B) { func BenchmarkScheduleMany100CpuNodes1CpuUnused(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("31")}}, - testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), + cpu("31"), + testfixtures.ItN32CpuNodes(100, testfixtures.TestPriorities), ), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 100), ) @@ -858,10 +851,10 @@ func BenchmarkScheduleMany100CpuNodes1CpuUnused(b *testing.B) { func BenchmarkScheduleMany1000CpuNodes1CpuUnused(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("31")}}, - testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), + cpu("31"), + testfixtures.ItN32CpuNodes(1000, testfixtures.TestPriorities), ), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1000), ) @@ -870,10 +863,10 @@ func BenchmarkScheduleMany1000CpuNodes1CpuUnused(b *testing.B) { func BenchmarkScheduleMany10000CpuNodes1CpuUnused(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("31")}}, - testfixtures.N32CpuNodes(10000, testfixtures.TestPriorities), + cpu("31"), + testfixtures.ItN32CpuNodes(10000, testfixtures.TestPriorities), ), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10000), ) @@ -881,9 +874,9 @@ func BenchmarkScheduleMany10000CpuNodes1CpuUnused(b *testing.B) { func BenchmarkScheduleManyResourceConstrained(b *testing.B) { nodes := append(append( - testfixtures.N32CpuNodes(500, testfixtures.TestPriorities), - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities)...), - testfixtures.N32CpuNodes(499, testfixtures.TestPriorities)..., + testfixtures.ItN32CpuNodes(500, testfixtures.TestPriorities), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities)...), + testfixtures.ItN32CpuNodes(499, testfixtures.TestPriorities)..., ) benchmarkScheduleMany( b, @@ -892,7 +885,7 @@ func BenchmarkScheduleManyResourceConstrained(b *testing.B) { ) } -func newNodeDbWithNodes(nodes []*schedulerobjects.Node) (*NodeDb, error) { +func newNodeDbWithNodes(nodes []*internaltypes.Node) (*NodeDb, error) { nodeDb, err := NewNodeDb( testfixtures.TestPriorityClasses, testfixtures.TestResources, @@ -906,11 +899,7 @@ func newNodeDbWithNodes(nodes []*schedulerobjects.Node) (*NodeDb, error) { } txn := nodeDb.Txn(true) for _, node := range nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - if err != nil { - return nil, err - } - if err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode); err != nil { + if err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node); err != nil { return nil, err } } diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index 18f484bf80e..baac6f27ecb 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -710,14 +710,14 @@ func BenchmarkNodeTypeIterator(b *testing.B) { 2, 2100, 2200, 2300, 2400, 2500, 2600, 2700, 2800, 2900, 3, 4, 5, 6, 7, 8, 9, } - nodes := testfixtures.N32CpuNodes(numNodes, testfixtures.TestPriorities) + nodes := testfixtures.ItN32CpuNodes(numNodes, testfixtures.TestPriorities) for i, node := range nodes { var q resource.Quantity q.SetMilli(allocatedMilliCpus[i%len(allocatedMilliCpus)]) - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( testfixtures.TestPriorities[len(testfixtures.TestPriorities)-1], - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": q}}, - []*schedulerobjects.Node{node}, + testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"cpu": q}), + []*internaltypes.Node{node}, ) } nodeDb, err := newNodeDbWithNodes(nodes) diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index c37fc2d7f5f..391126beb0c 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -315,31 +315,6 @@ func WithLabelsNodes(labels map[string]string, nodes []*schedulerobjects.Node) [ return nodes } -func ItWithLabelsNodes(additionalLabels map[string]string, nodes []*internaltypes.Node) []*internaltypes.Node { - result := make([]*internaltypes.Node, len(nodes)) - for i, node := range nodes { - labels := node.GetLabels() - maps.Copy(labels, additionalLabels) - result[i] = internaltypes.CreateNode(node.GetId(), - node.GetNodeType(), - node.GetIndex(), - node.GetExecutor(), - node.GetName(), - node.GetPool(), - node.GetTaints(), - labels, - node.GetTotalResources(), - node.GetUnallocatableResources(), - node.AllocatableByPriority, - node.AllocatedByQueue, - node.AllocatedByJobId, - node.EvictedJobRunIds, - nil) - - } - return result -} - func ItWithNodeTypeNodes(nodeType *internaltypes.NodeType, nodes []*internaltypes.Node) []*internaltypes.Node { result := make([]*internaltypes.Node, len(nodes)) for i, node := range nodes { @@ -828,6 +803,14 @@ func NTainted32CpuNodes(n int, priorities []int32) []*schedulerobjects.Node { return rv } +func ItNTainted32CpuNodes(n int, priorities []int32) []*internaltypes.Node { + rv := make([]*internaltypes.Node, n) + for i := 0; i < n; i++ { + rv[i] = ItTestTainted32CpuNode(priorities) + } + return rv +} + func N8GpuNodes(n int, priorities []int32) []*schedulerobjects.Node { rv := make([]*schedulerobjects.Node, n) for i := 0; i < n; i++ { @@ -920,6 +903,26 @@ func TestTainted32CpuNode(priorities []int32) *schedulerobjects.Node { return node } +func ItTestTainted32CpuNode(priorities []int32) *internaltypes.Node { + node := ItTest32CpuNode(priorities) + + node = TestNodeFactory.AddTaints([]*internaltypes.Node{node}, + []v1.Taint{ + { + Key: "largeJobsOnly", + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, + })[0] + + node = TestNodeFactory.AddLabels( + []*internaltypes.Node{node}, + map[string]string{"largeJobsOnly": "true"}, + )[0] + + return node +} + func Test8GpuNode(priorities []int32) *schedulerobjects.Node { node := TestNode( priorities, @@ -942,9 +945,9 @@ func ItTest8GpuNode(priorities []int32) *internaltypes.Node { "nvidia.com/gpu": resource.MustParse("8"), }, ) - return ItWithLabelsNodes( - map[string]string{"gpu": "true"}, + return TestNodeFactory.AddLabels( []*internaltypes.Node{node}, + map[string]string{"gpu": "true"}, )[0] } From 831ba4ff9b4f62012bca7d65eb00727500f81be1 Mon Sep 17 00:00:00 2001 From: Robert Smith Date: Mon, 30 Dec 2024 15:46:17 +0000 Subject: [PATCH 4/7] progress Signed-off-by: Robert Smith --- internal/scheduler/nodedb/nodedb_test.go | 55 ++++++++----------- .../scheduler/nodedb/nodeiteration_test.go | 16 ------ 2 files changed, 23 insertions(+), 48 deletions(-) diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index fe9b737a660..8a08e6b1443 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -34,42 +34,34 @@ func TestTotalResources(t *testing.T) { assert.False(t, nodeDb.TotalKubernetesResources().IsEmpty()) assert.True(t, nodeDb.TotalKubernetesResources().AllZero()) - expected := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity)} + expected := testfixtures.TestNodeFactory.ResourceListFactory().MakeAllZero() // Upserting nodes for the first time should increase the resource count. - nodes := testfixtures.N32CpuNodes(2, testfixtures.TestPriorities) + nodes := testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities) for _, node := range nodes { - expected.Add(node.TotalResources) + expected = expected.Add(node.GetTotalResources()) } txn := nodeDb.Txn(true) for _, node := range nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(t, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) require.NoError(t, err) } txn.Commit() - assert.True(t, expected.Equal(schedulerobjects.ResourceList{ - Resources: nodeDb.TotalKubernetesResources().ToMap(), - })) + assert.True(t, expected.Equal(nodeDb.TotalKubernetesResources())) // Upserting new nodes should increase the resource count. - nodes = testfixtures.N8GpuNodes(3, testfixtures.TestPriorities) + nodes = testfixtures.ItN8GpuNodes(3, testfixtures.TestPriorities) for _, node := range nodes { - expected.Add(node.TotalResources) + expected = expected.Add(node.GetTotalResources()) } txn = nodeDb.Txn(true) for _, node := range nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(t, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) require.NoError(t, err) } txn.Commit() - assert.True(t, expected.Equal(schedulerobjects.ResourceList{ - Resources: nodeDb.TotalKubernetesResources().ToMap(), - })) + assert.True(t, expected.Equal(nodeDb.TotalKubernetesResources())) } func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) { @@ -287,13 +279,12 @@ func TestEviction(t *testing.T) { testfixtures.Test1Cpu4GiJob("queue-alice", testfixtures.PriorityClass0), testfixtures.Test1Cpu4GiJob("queue-alice", testfixtures.PriorityClass3), } - node := testfixtures.Test32CpuNode(testfixtures.TestPriorities) - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) + node := testfixtures.ItTest32CpuNode(testfixtures.TestPriorities) require.NoError(t, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, jobs, dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, jobs, node) txn.Commit() require.NoError(t, err) - entry, err := nodeDb.GetNode(node.Id) + entry, err := nodeDb.GetNode(node.GetId()) require.NoError(t, err) existingJobs := make([]*jobdb.Job, len(jobs)) @@ -565,18 +556,18 @@ func TestAwayNodeTypes(t *testing.T) { require.NoError(t, err) nodeDbTxn := nodeDb.Txn(true) - node := testfixtures.Test32CpuNode([]int32{29000, 30000}) - node.Taints = append( - node.Taints, - v1.Taint{ - Key: "gpu", - Value: "true", - Effect: v1.TaintEffectNoSchedule, + node := testfixtures.ItTest32CpuNode([]int32{29000, 30000}) + node = testfixtures.TestNodeFactory.AddTaints( + []*internaltypes.Node{node}, + []v1.Taint{ + { + Key: "gpu", + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, }, - ) - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(t, err) - require.NoError(t, nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, nil, dbNode)) + )[0] + require.NoError(t, nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, nil, node)) jobId := util.ULID() job := testfixtures.TestJob( diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index baac6f27ecb..dad4f940610 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -15,7 +15,6 @@ import ( "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/scheduler/internaltypes" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/internal/scheduler/testfixtures" ) @@ -756,21 +755,6 @@ func BenchmarkNodeTypeIterator(b *testing.B) { } } -func withNodeTypeNodes(nodeTypeLabel string, nodes []*schedulerobjects.Node) []*schedulerobjects.Node { - for _, node := range nodes { - node.Labels[testfixtures.NodeTypeLabel] = nodeTypeLabel - } - return nodes -} - -func nodeTypeLabelToNodeTypeId(nodeTypeLabel string) uint64 { - return labelsToNodeTypeId(map[string]string{testfixtures.NodeTypeLabel: nodeTypeLabel}) -} - -func gpuNodeTypeLabelToNodeTypeId(nodeTypeLabel string) uint64 { - return labelsToNodeTypeId(map[string]string{testfixtures.NodeTypeLabel: nodeTypeLabel, "gpu": "true"}) -} - func labelsToNodeTypeId(labels map[string]string) uint64 { nodeType := internaltypes.NewNodeType( []v1.Taint{}, From bfc10f314d38a658bae079aeb3614b7af62bc569 Mon Sep 17 00:00:00 2001 From: Robert Smith Date: Mon, 30 Dec 2024 15:49:56 +0000 Subject: [PATCH 5/7] tidy Signed-off-by: Robert Smith --- .../internaltypes/resource_list_map_util.go | 1 - internal/scheduler/nodedb/nodeiteration_test.go | 15 --------------- internal/scheduler/testfixtures/testfixtures.go | 3 --- 3 files changed, 19 deletions(-) diff --git a/internal/scheduler/internaltypes/resource_list_map_util.go b/internal/scheduler/internaltypes/resource_list_map_util.go index e8a3e16085f..b851e82c6cc 100644 --- a/internal/scheduler/internaltypes/resource_list_map_util.go +++ b/internal/scheduler/internaltypes/resource_list_map_util.go @@ -62,7 +62,6 @@ func NewAllocatableByPriorityAndResourceType(priorities []int32, rl ResourceList result := map[int32]ResourceList{} for _, priority := range priorities { result[priority] = rl - } return result } diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index dad4f940610..6c5f7e35571 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -374,11 +374,6 @@ func TestNodeTypeIterator(t *testing.T) { } func TestNodeTypesIterator(t *testing.T) { - const nodeTypeALabel = "a" - const nodeTypeBLabel = "b" - const nodeTypeCLabel = "c" - const nodeTypeDLabel = "d" - nodeTypeA := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "a"}) nodeTypeB := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "b"}) nodeTypeC := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "c"}) @@ -755,16 +750,6 @@ func BenchmarkNodeTypeIterator(b *testing.B) { } } -func labelsToNodeTypeId(labels map[string]string) uint64 { - nodeType := internaltypes.NewNodeType( - []v1.Taint{}, - labels, - util.StringListToSet(testfixtures.TestIndexedTaints), - util.StringListToSet(testfixtures.TestIndexedNodeLabels), - ) - return nodeType.GetId() -} - func labelsToNodeType(labels map[string]string) *internaltypes.NodeType { nodeType := internaltypes.NewNodeType( []v1.Taint{}, diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 391126beb0c..5c66f044119 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -333,7 +333,6 @@ func ItWithNodeTypeNodes(nodeType *internaltypes.NodeType, nodes []*internaltype node.AllocatedByJobId, node.EvictedJobRunIds, nil) - } return result } @@ -357,7 +356,6 @@ func ItWithIdNodes(nodeId string, nodes []*internaltypes.Node) []*internaltypes. node.EvictedJobRunIds, nil, ) - } return result } @@ -867,7 +865,6 @@ func ItTestNode(priorities []int32, resources map[string]resource.Quantity) *int rl, map[int32]internaltypes.ResourceList{}, internaltypes.NewAllocatableByPriorityAndResourceType(priorities, rl)) - } func Test32CpuNode(priorities []int32) *schedulerobjects.Node { From 0cf3067212304d4bcc5c27cde00a61022454aea3 Mon Sep 17 00:00:00 2001 From: Robert Smith Date: Mon, 30 Dec 2024 18:25:54 +0000 Subject: [PATCH 6/7] tidy Signed-off-by: Robert Smith --- .../scheduler/internaltypes/node_factory.go | 8 ++-- .../resource_list_map_util_test.go | 40 +++++++++++++++++++ internal/scheduler/nodedb/nodedb_test.go | 35 ++++------------ .../scheduler/nodedb/nodeiteration_test.go | 2 +- 4 files changed, 52 insertions(+), 33 deletions(-) diff --git a/internal/scheduler/internaltypes/node_factory.go b/internal/scheduler/internaltypes/node_factory.go index 5084601195c..78fbef3d938 100644 --- a/internal/scheduler/internaltypes/node_factory.go +++ b/internal/scheduler/internaltypes/node_factory.go @@ -122,8 +122,8 @@ func (f *NodeFactory) AddLabels(nodes []*Node, extraLabels map[string]string) [] newLabels, f.indexedTaints, f.indexedNodeLabels, - node.totalResources, - node.unallocatableResources, + node.GetTotalResources(), + node.GetUnallocatableResources(), node.AllocatableByPriority, ) } @@ -143,8 +143,8 @@ func (f *NodeFactory) AddTaints(nodes []*Node, extraTaints []v1.Taint) []*Node { node.GetLabels(), f.indexedTaints, f.indexedNodeLabels, - node.totalResources, - node.unallocatableResources, + node.GetTotalResources(), + node.GetUnallocatableResources(), node.AllocatableByPriority, ) } diff --git a/internal/scheduler/internaltypes/resource_list_map_util_test.go b/internal/scheduler/internaltypes/resource_list_map_util_test.go index 046e9cdc235..85457c42a1a 100644 --- a/internal/scheduler/internaltypes/resource_list_map_util_test.go +++ b/internal/scheduler/internaltypes/resource_list_map_util_test.go @@ -88,6 +88,46 @@ func TestNewAllocatableByPriorityAndResourceType(t *testing.T) { assert.Equal(t, int64(2000), result[2].GetByNameZeroIfMissing("cpu")) } +func TestMarkAllocated(t *testing.T) { + factory := testFactory() + m := map[int32]ResourceList{ + 1: testResourceList(factory, "10", "10Gi"), + 2: testResourceList(factory, "20", "20Gi"), + 3: testResourceList(factory, "30", "30Gi"), + 4: testResourceList(factory, "40", "40Gi"), + } + + expected := map[int32]ResourceList{ + 1: testResourceList(factory, "8", "8Gi"), + 2: testResourceList(factory, "18", "18Gi"), + 3: testResourceList(factory, "30", "30Gi"), + 4: testResourceList(factory, "40", "40Gi"), + } + + MarkAllocated(m, 2, testResourceList(factory, "2", "2Gi")) + assert.Equal(t, expected, m) +} + +func TestMarkAllocatable(t *testing.T) { + factory := testFactory() + m := map[int32]ResourceList{ + 1: testResourceList(factory, "10", "10Gi"), + 2: testResourceList(factory, "20", "20Gi"), + 3: testResourceList(factory, "30", "30Gi"), + 4: testResourceList(factory, "40", "40Gi"), + } + + expected := map[int32]ResourceList{ + 1: testResourceList(factory, "12", "12Gi"), + 2: testResourceList(factory, "22", "22Gi"), + 3: testResourceList(factory, "30", "30Gi"), + 4: testResourceList(factory, "40", "40Gi"), + } + + MarkAllocatable(m, 2, testResourceList(factory, "2", "2Gi")) + assert.Equal(t, expected, m) +} + func testMapAllPositive(factory *ResourceListFactory) map[string]ResourceList { return map[string]ResourceList{ "a": testResourceList(factory, "1", "1Ki"), diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index 8a08e6b1443..5e87f425876 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -358,12 +358,13 @@ func TestScheduleIndividually(t *testing.T) { ExpectSuccess: []bool{false, false, true}, }, "node selector": { - Nodes: testfixtures.TestNodeFactory.AddLabels( - testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), - map[string]string{ - "key": "value", - }, - ), + Nodes: append(testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), + map[string]string{ + "key": "value", + }, + )...), Jobs: testfixtures.WithNodeSelectorJobs( map[string]string{ "key": "value", @@ -898,28 +899,6 @@ func newNodeDbWithNodes(nodes []*internaltypes.Node) (*NodeDb, error) { return nodeDb, nil } -func itNewNodeDbWithNodes(nodes []*internaltypes.Node) (*NodeDb, error) { - nodeDb, err := NewNodeDb( - testfixtures.TestPriorityClasses, - testfixtures.TestResources, - testfixtures.TestIndexedTaints, - testfixtures.TestIndexedNodeLabels, - testfixtures.TestWellKnownNodeTypes, - testfixtures.TestResourceListFactory, - ) - if err != nil { - return nil, err - } - txn := nodeDb.Txn(true) - for _, node := range nodes { - if err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node); err != nil { - return nil, err - } - } - txn.Commit() - return nodeDb, nil -} - func BenchmarkNodeDbStringFromPodRequirementsNotMetReason(b *testing.B) { nodeDb := &NodeDb{ podRequirementsNotMetReasonStringCache: make(map[uint64]string, 128), diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index 6c5f7e35571..aa699293139 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -38,7 +38,7 @@ func TestNodesIterator(t *testing.T) { for i, node := range tc.Nodes { indexById[node.GetId()] = i } - nodeDb, err := itNewNodeDbWithNodes(tc.Nodes) + nodeDb, err := newNodeDbWithNodes(tc.Nodes) if !assert.NoError(t, err) { return } From eb034ca90a9ef77ae662b935aa2e9daf27d8766d Mon Sep 17 00:00:00 2001 From: Robert Smith Date: Mon, 30 Dec 2024 18:55:26 +0000 Subject: [PATCH 7/7] tidy Signed-off-by: Robert Smith --- internal/scheduler/internaltypes/node_factory.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/scheduler/internaltypes/node_factory.go b/internal/scheduler/internaltypes/node_factory.go index 78fbef3d938..76d454c11ad 100644 --- a/internal/scheduler/internaltypes/node_factory.go +++ b/internal/scheduler/internaltypes/node_factory.go @@ -31,7 +31,7 @@ type NodeFactory struct { resourceListFactory *ResourceListFactory // Used for assigning node index - nodeIndexCounter uint64 + nodeIndexCounter atomic.Uint64 } func NewNodeFactory( @@ -43,7 +43,7 @@ func NewNodeFactory( indexedTaints: util.StringListToSet(indexedTaints), indexedNodeLabels: util.StringListToSet(indexedNodeLabels), resourceListFactory: resourceListFactory, - nodeIndexCounter: 0, + nodeIndexCounter: atomic.Uint64{}, } } @@ -152,5 +152,5 @@ func (f *NodeFactory) AddTaints(nodes []*Node, extraTaints []v1.Taint) []*Node { } func (f *NodeFactory) allocateNodeIndex() uint64 { - return atomic.AddUint64(&f.nodeIndexCounter, 1) + return f.nodeIndexCounter.Add(1) }