diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go index ca2aebe4b..33a7c3182 100644 --- a/pkg/scheduler/context.go +++ b/pkg/scheduler/context.go @@ -365,7 +365,7 @@ func (cc *ClusterContext) updateSchedulerConfig(conf *configs.SchedulerConfig, r part, ok := cc.partitions[p.Name] if ok { // make sure the new info passes all checks - _, err = newPartitionContext(p, rmID, nil) + _, err = newPartitionContextForValidation(p, rmID, nil) if err != nil { return err } diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index c758667fc..95607ab93 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -118,21 +118,19 @@ func newBlankQueue() *Queue { func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue) (*Queue, error) { queue, err := newConfiguredQueueInternal(conf, parent) if queue != nil { + queue.queueEvents = schedEvt.NewQueueEvents(events.GetEventSystem()) log.Log(log.SchedQueue).Info("configured queue added to scheduler", zap.String("queueName", queue.QueuePath)) + queue.queueEvents.SendNewQueueEvent(queue.QueuePath, queue.isManaged) } return queue, err } -// NewConfiguredShadowQueue creates a new queue from scratch based on the configuration and logs at debug level +// NewConfiguredQueueForValidation is used to validate the queue configuration. +// It works similarly to NewConfiguredQueue but neither logs the queue creation nor sends a queue event. // lock free as it cannot be referenced yet -func NewConfiguredShadowQueue(conf configs.QueueConfig, parent *Queue) (*Queue, error) { - queue, err := newConfiguredQueueInternal(conf, parent) - if queue != nil { - log.Log(log.SchedQueue).Debug("shadow queue created", - zap.String("queueName", queue.QueuePath)) - } - return queue, err +func NewConfiguredQueueForValidation(conf configs.QueueConfig, parent *Queue) (*Queue, error) { + return newConfiguredQueueInternal(conf, parent) } func newConfiguredQueueInternal(conf configs.QueueConfig, parent *Queue) (*Queue, error) { @@ -164,8 +162,6 @@ func newConfiguredQueueInternal(conf configs.QueueConfig, parent *Queue) (*Queue } else { sq.UpdateQueueProperties() } - sq.queueEvents = schedEvt.NewQueueEvents(events.GetEventSystem()) - sq.queueEvents.SendNewQueueEvent(sq.QueuePath, sq.isManaged) return sq, nil } diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 68e6746f2..e30deb09e 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -79,7 +79,29 @@ type PartitionContext struct { locking.RWMutex } +// newPartitionContextForValidation initializes a shadow partition based on the configuration. +// The shadow partition is used to validate the configuration, it is not used for scheduling. +func newPartitionContextForValidation(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) { + pc, err := newPartitionContextInternal(conf, rmID, cc) + if pc != nil { + if err := pc.initialPartitionFromConfigForValidation(conf); err != nil { + return nil, err + } + } + return pc, err +} + func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) { + pc, err := newPartitionContextInternal(conf, rmID, cc) + if pc != nil { + if err := pc.initialPartitionFromConfig(conf); err != nil { + return nil, err + } + } + return pc, err +} + +func newPartitionContextInternal(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) { if conf.Name == "" || rmID == "" { log.Log(log.SchedPartition).Info("partition cannot be created", zap.String("partition name", conf.Name), @@ -98,13 +120,41 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC foreignAllocs: make(map[string]*objects.Allocation), } pc.partitionManager = newPartitionManager(pc, cc) - if err := pc.initialPartitionFromConfig(conf); err != nil { - return nil, err - } return pc, nil } +// initialPartitionFromConfigForValidation is used to validate the partition configuration. +// It works similarly to initialPartitionFromConfig but neither logs the queue creation, sends a queue event, logs the node sorting policy, +// nor updates user settings. +func (pc *PartitionContext) initialPartitionFromConfigForValidation(conf configs.PartitionConfig) error { + if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue { + return fmt.Errorf("partition cannot be created without root queue") + } + + // Setup the queue structure: root first it should be the only queue at this level + // Add the rest of the queue structure recursively + queueConf := conf.Queues[0] + var err error + if pc.root, err = objects.NewConfiguredQueueForValidation(queueConf, nil); err != nil { + return err + } + // recursively add the queues to the root + if err = pc.addQueueForValidation(queueConf.Queues, pc.root); err != nil { + return err + } + + // We need to pass in the locked version of the GetQueue function. + // Placing an application will not have a lock on the partition context. + pc.placementManager = placement.NewPlacementManager(conf.PlacementRules, pc.GetQueue) + // get the user group cache for the partition + pc.userGroupCache = security.GetUserGroupCache("") + pc.updateNodeSortingPolicyForValidation(conf) + pc.updatePreemption(conf) + + return nil +} + // Initialise the partition func (pc *PartitionContext) initialPartitionFromConfig(conf configs.PartitionConfig) error { if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue { @@ -138,6 +188,13 @@ func (pc *PartitionContext) initialPartitionFromConfig(conf configs.PartitionCon return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name) } +// updateNodeSortingPolicyForValidation is used to validate the partition configuration. +// It works similarly to updateNodeSortingPolicy but without logging. +// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock. +func (pc *PartitionContext) updateNodeSortingPolicyForValidation(conf configs.PartitionConfig) { + pc.updateNodeSortingPolicyInternal(conf) +} + // NOTE: this is a lock free call. It should only be called holding the PartitionContext lock. func (pc *PartitionContext) updateNodeSortingPolicy(conf configs.PartitionConfig) { var configuredPolicy policies.SortingPolicy @@ -150,6 +207,10 @@ func (pc *PartitionContext) updateNodeSortingPolicy(conf configs.PartitionConfig log.Log(log.SchedPartition).Info("NodeSorting policy set from config", zap.Stringer("policyName", configuredPolicy)) } + pc.updateNodeSortingPolicyInternal(conf) +} + +func (pc *PartitionContext) updateNodeSortingPolicyInternal(conf configs.PartitionConfig) { pc.nodes.SetNodeSortingPolicy(objects.NewNodeSortingPolicy(conf.NodeSortPolicy.Type, conf.NodeSortPolicy.ResourceWeights)) } @@ -191,17 +252,27 @@ func (pc *PartitionContext) updatePartitionDetails(conf configs.PartitionConfig) return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name) } +func (pc *PartitionContext) addQueueForValidation(conf []configs.QueueConfig, parent *objects.Queue) error { + err := pc.addQueueInternal(conf, parent, objects.NewConfiguredQueueForValidation) + return err +} + // Process the config structure and create a queue info tree for this partition func (pc *PartitionContext) addQueue(conf []configs.QueueConfig, parent *objects.Queue) error { + err := pc.addQueueInternal(conf, parent, objects.NewConfiguredQueue) + return err +} + +func (pc *PartitionContext) addQueueInternal(conf []configs.QueueConfig, parent *objects.Queue, newQueueFn func(configs.QueueConfig, *objects.Queue) (*objects.Queue, error)) error { // create the queue at this level for _, queueConf := range conf { - thisQueue, err := objects.NewConfiguredQueue(queueConf, parent) + thisQueue, err := newQueueFn(queueConf, parent) if err != nil { return err } // recursive create the queues below if len(queueConf.Queues) > 0 { - err = pc.addQueue(queueConf.Queues, thisQueue) + err = pc.addQueueInternal(queueConf.Queues, thisQueue, newQueueFn) if err != nil { return err } @@ -224,7 +295,7 @@ func (pc *PartitionContext) updateQueues(config []configs.QueueConfig, parent *o queue := pc.getQueueInternal(pathName) var err error if queue == nil { - queue, err = objects.NewConfiguredShadowQueue(queueConfig, parent) + queue, err = objects.NewConfiguredQueue(queueConfig, parent) } else { err = queue.ApplyConf(queueConfig) }