Skip to content

Commit

Permalink
Restructure scheduler to have scheduling folder (armadaproject#3962)
Browse files Browse the repository at this point in the history
* Restructure scheduler to have scheduling folder

Move various files into the scheduling folder to better organise the scheduler

Signed-off-by: JamesMurkin <[email protected]>

* Fix imports

Signed-off-by: JamesMurkin <[email protected]>

---------

Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin authored Sep 23, 2024
1 parent 4e190b7 commit fc19a27
Show file tree
Hide file tree
Showing 38 changed files with 218 additions and 221 deletions.
4 changes: 2 additions & 2 deletions internal/scheduler/metrics/cycle_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/prometheus/client_golang/prometheus"

"github.com/armadaproject/armada/internal/scheduler/schedulerresult"
"github.com/armadaproject/armada/internal/scheduler/scheduling"
)

var (
Expand Down Expand Up @@ -169,7 +169,7 @@ func (m *cycleMetrics) ReportReconcileCycleTime(cycleTime time.Duration) {
m.reconciliationCycleTime.Observe(float64(cycleTime.Milliseconds()))
}

func (m *cycleMetrics) ReportSchedulerResult(result schedulerresult.SchedulerResult) {
func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult) {
// Metrics that depend on pool
for _, schedContext := range result.SchedulingContexts {
pool := schedContext.Pool
Expand Down
8 changes: 4 additions & 4 deletions internal/scheduler/metrics/cycle_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/fairness"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/schedulerresult"
"github.com/armadaproject/armada/internal/scheduler/scheduling"
"github.com/armadaproject/armada/internal/scheduler/scheduling/context"
"github.com/armadaproject/armada/internal/scheduler/scheduling/fairness"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
)

Expand All @@ -25,7 +25,7 @@ func TestReportStateTransitions(t *testing.T) {
cpu(100),
configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"cpu"}})
require.NoError(t, err)
result := schedulerresult.SchedulerResult{
result := scheduling.SchedulerResult{
SchedulingContexts: []*context.SchedulingContext{
{
Pool: "pool1",
Expand Down
28 changes: 14 additions & 14 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/scheduler/configuration"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
koTaint "github.com/armadaproject/armada/internal/scheduler/kubernetesobjects/taint"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/scheduling/context"
)

const (
Expand Down Expand Up @@ -131,7 +131,7 @@ type EvictedJobSchedulingContext struct {
// When choosing on which node to schedule a job that would prevent re-scheduling evicted jobs,
// nodeDb choses the node that would prevent re-scheduling jobs with as a large an index as possible.
Index int
JobSchedulingContext *schedulercontext.JobSchedulingContext
JobSchedulingContext *context.JobSchedulingContext
}

// NodeDb is the scheduler-internal system used to efficiently find nodes on which a pod could be scheduled.
Expand Down Expand Up @@ -401,7 +401,7 @@ func (nodeDb *NodeDb) GetNodeWithTxn(txn *memdb.Txn, id string) (*internaltypes.
return obj.(*internaltypes.Node), nil
}

func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, gctx *schedulercontext.GangSchedulingContext) (bool, error) {
func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, gctx *context.GangSchedulingContext) (bool, error) {
// Attempt to schedule pods one by one in a transaction.
for _, jctx := range gctx.JobSchedulingContexts {
// In general, we may attempt to schedule a gang multiple times (in
Expand Down Expand Up @@ -446,7 +446,7 @@ func deleteEvictedJobSchedulingContextIfExistsWithTxn(txn *memdb.Txn, jobId stri
}

// SelectNodeForJobWithTxn selects a node on which the job can be scheduled.
func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercontext.JobSchedulingContext) (*internaltypes.Node, error) {
func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *context.JobSchedulingContext) (*internaltypes.Node, error) {
priorityClass := jctx.Job.PriorityClass()

// If the job has already been scheduled, get the priority at which it was scheduled.
Expand All @@ -455,7 +455,7 @@ func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercon
if !ok {
priority = jctx.Job.PriorityClass().Priority
}
pctx := &schedulercontext.PodSchedulingContext{
pctx := &context.PodSchedulingContext{
Created: time.Now(),
ScheduledAtPriority: priority,
PreemptedAtPriority: MinPriority,
Expand Down Expand Up @@ -516,7 +516,7 @@ func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercon

func (nodeDb *NodeDb) selectNodeForJobWithTxnAndAwayNodeType(
txn *memdb.Txn,
jctx *schedulercontext.JobSchedulingContext,
jctx *context.JobSchedulingContext,
awayNodeType types.AwayNodeType,
) (node *internaltypes.Node, err error) {
// Save the number of additional tolerations that the job originally had; we
Expand Down Expand Up @@ -549,7 +549,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithTxnAndAwayNodeType(

func (nodeDb *NodeDb) selectNodeForJobWithTxnAtPriority(
txn *memdb.Txn,
jctx *schedulercontext.JobSchedulingContext,
jctx *context.JobSchedulingContext,
) (*internaltypes.Node, error) {
pctx := jctx.PodSchedulingContext

Expand Down Expand Up @@ -607,7 +607,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithTxnAtPriority(
return nil, nil
}

func assertPodSchedulingContextNode(pctx *schedulercontext.PodSchedulingContext, node *internaltypes.Node) error {
func assertPodSchedulingContextNode(pctx *context.PodSchedulingContext, node *internaltypes.Node) error {
if node != nil {
if pctx.NodeId == "" {
return errors.New("pctx.NodeId not set")
Expand All @@ -623,7 +623,7 @@ func assertPodSchedulingContextNode(pctx *schedulercontext.PodSchedulingContext,

func (nodeDb *NodeDb) selectNodeForJobWithUrgencyPreemption(
txn *memdb.Txn,
jctx *schedulercontext.JobSchedulingContext,
jctx *context.JobSchedulingContext,
matchingNodeTypeIds []uint64,
) (*internaltypes.Node, error) {
pctx := jctx.PodSchedulingContext
Expand Down Expand Up @@ -658,7 +658,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithUrgencyPreemption(

func (nodeDb *NodeDb) selectNodeForPodAtPriority(
txn *memdb.Txn,
jctx *schedulercontext.JobSchedulingContext,
jctx *context.JobSchedulingContext,
matchingNodeTypeIds []uint64,
priority int32,
) (*internaltypes.Node, error) {
Expand Down Expand Up @@ -699,7 +699,7 @@ func (nodeDb *NodeDb) selectNodeForPodAtPriority(

func (nodeDb *NodeDb) selectNodeForPodWithItAtPriority(
it memdb.ResultIterator,
jctx *schedulercontext.JobSchedulingContext,
jctx *context.JobSchedulingContext,
priority int32,
onlyCheckDynamicRequirements bool,
) (*internaltypes.Node, error) {
Expand Down Expand Up @@ -743,7 +743,7 @@ func (nodeDb *NodeDb) selectNodeForPodWithItAtPriority(
//
// It does this by considering all evicted jobs in the reverse order they would be scheduled in and preventing
// from being re-scheduled the jobs that would be scheduled last.
func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *schedulercontext.JobSchedulingContext) (*internaltypes.Node, error) {
func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *context.JobSchedulingContext) (*internaltypes.Node, error) {
type consideredNode struct {
node *internaltypes.Node
availableResource internaltypes.ResourceList
Expand Down Expand Up @@ -1022,7 +1022,7 @@ func (nodeDb *NodeDb) unbindJobFromNodeInPlace(job *jobdb.Job, node *internaltyp

// NodeTypesMatchingJob returns a slice with all node types a pod could be scheduled on.
// It also returns the number of nodes excluded by reason for exclusion.
func (nodeDb *NodeDb) NodeTypesMatchingJob(jctx *schedulercontext.JobSchedulingContext) ([]uint64, map[string]int, error) {
func (nodeDb *NodeDb) NodeTypesMatchingJob(jctx *context.JobSchedulingContext) ([]uint64, map[string]int, error) {
var matchingNodeTypeIds []uint64
numExcludedNodesByReason := make(map[string]int)
for _, nodeType := range nodeDb.nodeTypes {
Expand Down Expand Up @@ -1113,7 +1113,7 @@ func newAllocatableByPriorityAndResourceType(priorities []int32, rl internaltype
return rv
}

func (nodeDb *NodeDb) AddEvictedJobSchedulingContextWithTxn(txn *memdb.Txn, index int, jctx *schedulercontext.JobSchedulingContext) error {
func (nodeDb *NodeDb) AddEvictedJobSchedulingContextWithTxn(txn *memdb.Txn, index int, jctx *context.JobSchedulingContext) error {
if it, err := txn.Get("evictedJobs", "id", jctx.JobId); err != nil {
return errors.WithStack(err)
} else if obj := it.Next(); obj != nil {
Expand Down
25 changes: 12 additions & 13 deletions internal/scheduler/nodedb/nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"testing"

"github.com/armadaproject/armada/internal/scheduler/adapters"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
Expand All @@ -14,11 +12,12 @@ import (

armadamaps "github.com/armadaproject/armada/internal/common/maps"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/scheduler/adapters"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/scheduling/context"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
)

Expand Down Expand Up @@ -71,7 +70,7 @@ func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) {
db, err := newNodeDbWithNodes(nodes)
require.NoError(t, err)
jobs := testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs)
jctxs := context.JobSchedulingContextsFromJobs(jobs)
for _, jctx := range jctxs {
txn := db.Txn(false)
jctx.SetAssignedNodeId(nodeId)
Expand All @@ -96,7 +95,7 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) {
db, err := newNodeDbWithNodes(nodes)
require.NoError(t, err)
jobs := testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs)
jctxs := context.JobSchedulingContextsFromJobs(jobs)
for _, jctx := range jctxs {
txn := db.Txn(false)
jctx.SetAssignedNodeId("non-existent node")
Expand Down Expand Up @@ -433,11 +432,11 @@ func TestScheduleIndividually(t *testing.T) {
nodeDb, err := newNodeDbWithNodes(tc.Nodes)
require.NoError(t, err)

jctxs := schedulercontext.JobSchedulingContextsFromJobs(tc.Jobs)
jctxs := context.JobSchedulingContextsFromJobs(tc.Jobs)

for i, jctx := range jctxs {
nodeDbTxn := nodeDb.Txn(true)
gctx := schedulercontext.NewGangSchedulingContext([]*schedulercontext.JobSchedulingContext{jctx})
gctx := context.NewGangSchedulingContext([]*context.JobSchedulingContext{jctx})
ok, err := nodeDb.ScheduleManyWithTxn(nodeDbTxn, gctx)
require.NoError(t, err)

Expand Down Expand Up @@ -523,8 +522,8 @@ func TestScheduleMany(t *testing.T) {
require.NoError(t, err)
for i, jobs := range tc.Jobs {
nodeDbTxn := nodeDb.Txn(true)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs)
gctx := schedulercontext.NewGangSchedulingContext(jctxs)
jctxs := context.JobSchedulingContextsFromJobs(jobs)
gctx := context.NewGangSchedulingContext(jctxs)
ok, err := nodeDb.ScheduleManyWithTxn(nodeDbTxn, gctx)
require.NoError(t, err)
require.Equal(t, tc.ExpectSuccess[i], ok)
Expand Down Expand Up @@ -578,9 +577,9 @@ func TestAwayNodeTypes(t *testing.T) {
"armada-preemptible-away",
testfixtures.Test1Cpu4GiPodReqs(testfixtures.TestQueue, jobId, 30000),
)
jctx := schedulercontext.JobSchedulingContextFromJob(job)
jctx := context.JobSchedulingContextFromJob(job)
require.Empty(t, jctx.AdditionalTolerations)
gctx := schedulercontext.NewGangSchedulingContext([]*schedulercontext.JobSchedulingContext{jctx})
gctx := context.NewGangSchedulingContext([]*context.JobSchedulingContext{jctx})

ok, err := nodeDb.ScheduleManyWithTxn(nodeDbTxn, gctx)
require.NoError(t, err)
Expand Down Expand Up @@ -772,8 +771,8 @@ func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs []

b.ResetTimer()
for n := 0; n < b.N; n++ {
jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs)
gctx := schedulercontext.NewGangSchedulingContext(jctxs)
jctxs := context.JobSchedulingContextsFromJobs(jobs)
gctx := context.NewGangSchedulingContext(jctxs)
txn := nodeDb.Txn(true)
_, err := nodeDb.ScheduleManyWithTxn(txn, gctx)
txn.Abort()
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/nodedb/nodematching.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/scheduling/context"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/nodedb/nodematching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/scheduling/context"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
)

Expand Down
30 changes: 15 additions & 15 deletions internal/scheduler/reports/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"

schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/scheduling/context"
)

type CtxPoolPair[T any] struct {
Expand All @@ -16,18 +16,18 @@ type CtxPoolPair[T any] struct {
}

type SchedulingContextRepository struct {
mostRecentByPool atomic.Pointer[map[string]*schedulercontext.SchedulingContext]
mostRecentByPool atomic.Pointer[map[string]*context.SchedulingContext]
mu sync.Mutex
}

func NewSchedulingContextRepository() *SchedulingContextRepository {
mostRecentByExecutor := make(map[string]*schedulercontext.SchedulingContext)
mostRecentByExecutor := make(map[string]*context.SchedulingContext)
rv := &SchedulingContextRepository{}
rv.mostRecentByPool.Store(&mostRecentByExecutor)
return rv
}

func (r *SchedulingContextRepository) StoreSchedulingContext(sctx *schedulercontext.SchedulingContext) {
func (r *SchedulingContextRepository) StoreSchedulingContext(sctx *context.SchedulingContext) {
r.mu.Lock()
defer r.mu.Unlock()
byPool := r.mostRecentByPool.Load()
Expand All @@ -36,11 +36,11 @@ func (r *SchedulingContextRepository) StoreSchedulingContext(sctx *schedulercont
r.mostRecentByPool.Store(&byPoolCopy)
}

func (r *SchedulingContextRepository) QueueSchedulingContext(queue string) []CtxPoolPair[*schedulercontext.QueueSchedulingContext] {
func (r *SchedulingContextRepository) QueueSchedulingContext(queue string) []CtxPoolPair[*context.QueueSchedulingContext] {
contextsByPool := *(r.mostRecentByPool.Load())
ctxs := make([]CtxPoolPair[*schedulercontext.QueueSchedulingContext], 0, len(contextsByPool))
ctxs := make([]CtxPoolPair[*context.QueueSchedulingContext], 0, len(contextsByPool))
for _, pool := range sortedKeys(contextsByPool) {
ctx := CtxPoolPair[*schedulercontext.QueueSchedulingContext]{pool: pool}
ctx := CtxPoolPair[*context.QueueSchedulingContext]{pool: pool}
schedulingCtx, ok := contextsByPool[pool].QueueSchedulingContexts[queue]
if ok {
ctx.schedulingCtx = schedulingCtx
Expand All @@ -50,11 +50,11 @@ func (r *SchedulingContextRepository) QueueSchedulingContext(queue string) []Ctx
return ctxs
}

func (r *SchedulingContextRepository) JobSchedulingContext(jobId string) []CtxPoolPair[*schedulercontext.JobSchedulingContext] {
func (r *SchedulingContextRepository) JobSchedulingContext(jobId string) []CtxPoolPair[*context.JobSchedulingContext] {
contextsByPool := *(r.mostRecentByPool.Load())
ctxs := make([]CtxPoolPair[*schedulercontext.JobSchedulingContext], 0, len(contextsByPool))
ctxs := make([]CtxPoolPair[*context.JobSchedulingContext], 0, len(contextsByPool))
for _, pool := range sortedKeys(contextsByPool) {
ctx := CtxPoolPair[*schedulercontext.JobSchedulingContext]{
ctx := CtxPoolPair[*context.JobSchedulingContext]{
pool: pool,
schedulingCtx: getSchedulingReportForJob(contextsByPool[pool], jobId),
}
Expand All @@ -63,11 +63,11 @@ func (r *SchedulingContextRepository) JobSchedulingContext(jobId string) []CtxPo
return ctxs
}

func (r *SchedulingContextRepository) RoundSchedulingContext() []CtxPoolPair[*schedulercontext.SchedulingContext] {
func (r *SchedulingContextRepository) RoundSchedulingContext() []CtxPoolPair[*context.SchedulingContext] {
contextsByPool := *(r.mostRecentByPool.Load())
ctxs := make([]CtxPoolPair[*schedulercontext.SchedulingContext], 0, len(contextsByPool))
ctxs := make([]CtxPoolPair[*context.SchedulingContext], 0, len(contextsByPool))
for _, pool := range sortedKeys(contextsByPool) {
ctx := CtxPoolPair[*schedulercontext.SchedulingContext]{
ctx := CtxPoolPair[*context.SchedulingContext]{
pool: pool,
schedulingCtx: contextsByPool[pool],
}
Expand All @@ -76,7 +76,7 @@ func (r *SchedulingContextRepository) RoundSchedulingContext() []CtxPoolPair[*sc
return ctxs
}

func getSchedulingReportForJob(sctx *schedulercontext.SchedulingContext, jobId string) *schedulercontext.JobSchedulingContext {
func getSchedulingReportForJob(sctx *context.SchedulingContext, jobId string) *context.JobSchedulingContext {
for _, qctx := range sctx.QueueSchedulingContexts {
for _, jctx := range qctx.SuccessfulJobSchedulingContexts {
if jctx.JobId == jobId {
Expand All @@ -92,7 +92,7 @@ func getSchedulingReportForJob(sctx *schedulercontext.SchedulingContext, jobId s
return nil
}

func sortedKeys(s map[string]*schedulercontext.SchedulingContext) []string {
func sortedKeys(s map[string]*context.SchedulingContext) []string {
keys := maps.Keys(s)
slices.Sort(keys)
return keys
Expand Down
Loading

0 comments on commit fc19a27

Please sign in to comment.