Skip to content

Commit

Permalink
Small refactoring of scheduling_algo (#231) (#3967)
Browse files Browse the repository at this point in the history
* First step

* Step 2

* Step 4

* Remove change

* Remove change

* Make nodedb and sctx upfront

* Lint

Co-authored-by: James Murkin <[email protected]>
  • Loading branch information
JamesMurkin and James Murkin authored Sep 25, 2024
1 parent c9d3661 commit 8df0eb6
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 238 deletions.
4 changes: 2 additions & 2 deletions internal/scheduler/schedulerobjects/executor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package schedulerobjects

func (m *Executor) AllRuns() ([]string, error) {
func (m *Executor) AllRuns() []string {
runIds := make([]string, 0)
// add all runids from nodes
for _, node := range m.Nodes {
Expand All @@ -12,5 +12,5 @@ func (m *Executor) AllRuns() ([]string, error) {
for _, runId := range m.UnassignedJobRuns {
runIds = append(runIds, runId)
}
return runIds, nil
return runIds
}
4 changes: 2 additions & 2 deletions internal/scheduler/scheduling/constraints/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type priorityClassSchedulingConstraints struct {
MaximumResourcesPerQueue map[string]resource.Quantity
}

func NewSchedulingConstraints(pool string, totalResources schedulerobjects.ResourceList, config configuration.SchedulingConfig, queues []*api.Queue, cordonStatusByQueue map[string]bool) SchedulingConstraints {
func NewSchedulingConstraints(pool string, totalResources schedulerobjects.ResourceList, config configuration.SchedulingConfig, queues []*api.Queue) SchedulingConstraints {
priorityClassSchedulingConstraintsByPriorityClassName := make(map[string]priorityClassSchedulingConstraints, len(config.PriorityClasses))
for name, priorityClass := range config.PriorityClasses {
maximumResourceFractionPerQueue := priorityClass.MaximumResourceFractionPerQueue
Expand Down Expand Up @@ -117,7 +117,7 @@ func NewSchedulingConstraints(pool string, totalResources schedulerobjects.Resou
}
queueSchedulingConstraintsByQueueName[queue.Name] = queueSchedulingConstraints{
PriorityClassSchedulingConstraintsByPriorityClassName: priorityClassSchedulingConstraintsByPriorityClassNameForQueue,
Cordoned: cordonStatusByQueue[queue.Name],
Cordoned: queue.Cordoned,
}
}

Expand Down
34 changes: 17 additions & 17 deletions internal/scheduler/scheduling/constraints/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,36 +31,36 @@ func TestConstraints(t *testing.T) {
"no-constraints": makeConstraintsTest(
NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"),
makeSchedulingConfig(),
[]*api.Queue{},
map[string]bool{})),
[]*api.Queue{})),
"empty-queue-constraints": makeConstraintsTest(
NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"),
makeSchedulingConfig(),
[]*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{}}},
map[string]bool{"queue-1": false})),
[]*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{}}})),
"within-constraints": makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), configuration.SchedulingConfig{
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
MaxQueueLookback: 1000,
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.9, "memory": 0.9}}}},
}, []*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}}, map[string]bool{"queue-1": false})),
}, []*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}})),
"exceeds-queue-priority-class-constraint": func() *constraintTest {
t := makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), makeSchedulingConfig(), []*api.Queue{
{
Name: "queue-1",
Name: "queue-1",
Cordoned: false,
ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{
"priority-class-1": {
MaximumResourceFraction: map[string]float64{"cpu": 0.000001, "memory": 0.9},
},
},
},
}, map[string]bool{"queue-1": false}))
}))
t.expectedCheckConstraintsReason = "resource limit exceeded"
return t
}(),
"exceeds-queue-priority-class-pool-constraint": func() *constraintTest {
t := makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), makeSchedulingConfig(), []*api.Queue{
{
Name: "queue-1",
Name: "queue-1",
Cordoned: false,
ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{
"priority-class-1": {
MaximumResourceFractionByPool: map[string]*api.PriorityClassPoolResourceLimits{
Expand All @@ -71,7 +71,7 @@ func TestConstraints(t *testing.T) {
},
},
},
}, map[string]bool{"queue-1": false}))
}))
t.expectedCheckConstraintsReason = "resource limit exceeded"
return t
}(),
Expand All @@ -80,15 +80,15 @@ func TestConstraints(t *testing.T) {
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
MaxQueueLookback: 1000,
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.00000001, "memory": 0.9}}}},
}, []*api.Queue{}, map[string]bool{}))
}, []*api.Queue{}))
t.expectedCheckConstraintsReason = "resource limit exceeded"
return t
}(),
"priority-class-constraint-ignored-if-there-is-a-queue-constraint": makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), configuration.SchedulingConfig{
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
MaxQueueLookback: 1000,
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.00000001, "memory": 0.9}}}},
}, []*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}}, nil)),
}, []*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}})),
"one-constraint-per-level-falls-back-as-expected--within-limits": makeMultiLevelConstraintsTest(
map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("19"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.39")},
"",
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestCapResources(t *testing.T) {
expectedResources schedulerobjects.QuantityByTAndResourceType[string]
}{
"no contraints": {
constraints: NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), makeSchedulingConfig(), []*api.Queue{}, map[string]bool{}),
constraints: NewSchedulingConstraints("pool-1", makeResourceList("1000", "1000Gi"), makeSchedulingConfig(), []*api.Queue{}),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")},
expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")},
Expand All @@ -152,7 +152,7 @@ func TestCapResources(t *testing.T) {
},
},
},
}, []*api.Queue{}, map[string]bool{}),
}, []*api.Queue{}),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1", "1Gi")},
expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1", "1Gi")},
Expand All @@ -166,7 +166,7 @@ func TestCapResources(t *testing.T) {
},
},
},
}, []*api.Queue{}, map[string]bool{}),
}, []*api.Queue{}),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")},
expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("100", "900Gi")},
Expand All @@ -189,7 +189,7 @@ func TestCapResources(t *testing.T) {
},
},
},
}, map[string]bool{}),
}),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")},
expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("900", "900Gi")},
Expand All @@ -215,7 +215,7 @@ func TestCapResources(t *testing.T) {
},
},
},
}, map[string]bool{}),
}),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{
"priority-class-1": makeResourceList("1000", "1000Gi"),
Expand Down Expand Up @@ -323,7 +323,7 @@ func makeMultiLevelConstraints() SchedulingConstraints {
},
},
},
}, map[string]bool{})
})
}

func TestScaleQuantity(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduling/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func TestGangScheduler(t *testing.T) {
)
require.NoError(t, err)
}
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil, map[string]bool{})
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil)
floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(tc.SchedulingConfig.ExperimentalFloatingResources)
require.NoError(t, err)
sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1854,7 +1854,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
)
require.NoError(t, err)
}
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil, map[string]bool{})
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil)
sctx.UpdateFairShares()
sch := NewPreemptingQueueScheduler(
sctx,
Expand Down Expand Up @@ -2201,7 +2201,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
schedulerobjects.NewResourceList(0), schedulerobjects.NewResourceList(0), limiterByQueue[queue])
require.NoError(b, err)
}
constraints := schedulerconstraints.NewSchedulingConstraints("pool", nodeDb.TotalResources(), tc.SchedulingConfig, nil, map[string]bool{})
constraints := schedulerconstraints.NewSchedulingConstraints("pool", nodeDb.TotalResources(), tc.SchedulingConfig, nil)
sch := NewPreemptingQueueScheduler(
sctx,
constraints,
Expand Down
7 changes: 3 additions & 4 deletions internal/scheduler/scheduling/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/scheduling/constraints"
"github.com/armadaproject/armada/internal/scheduler/scheduling/context"

"github.com/armadaproject/armada/internal/common/armadacontext"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/stringinterner"
"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/nodedb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/scheduling/constraints"
"github.com/armadaproject/armada/internal/scheduler/scheduling/context"
"github.com/armadaproject/armada/internal/scheduler/scheduling/fairness"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
armadaconfiguration "github.com/armadaproject/armada/internal/server/configuration"
Expand Down Expand Up @@ -535,7 +534,7 @@ func TestQueueScheduler(t *testing.T) {
)
require.NoError(t, err)
}
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, tc.Queues, map[string]bool{})
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, tc.Queues)
jobIteratorByQueue := make(map[string]JobContextIterator)
for _, q := range tc.Queues {
it := jobRepo.GetJobIterator(q.Name)
Expand Down
Loading

0 comments on commit 8df0eb6

Please sign in to comment.